You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/10 10:10:52 UTC

[GitHub] [flink] gaoyunhaii commented on a change in pull request #18302: [FLINK-25569][core] Add decomposed Sink V2 interface

gaoyunhaii commented on a change in pull request #18302:
URL: https://github.com/apache/flink/pull/18302#discussion_r781025719



##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. Starts at 0 for the first

Review comment:
       `how often` -> `how many times` ? 

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * A service that allows to get the current processing time and register timers that will execute
+ * the given {@link ProcessingTimeCallback} when firing.
+ */
+@PublicEvolving
+public interface ProcessingTimeService {

Review comment:
       Hi @fapaul~ could you elaborate me a bit why we want to split the `ProcessingTimeService` into two classes~? I'm asking since the remaining methods seems to be similar to `registerTimer`, like `scheduleWithFixedDelay`. Is it possible we directly move the original `ProcessingTimeService` into core~?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. Starts at 0 for the first
+         * attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.
+         */
+        void failedWithKnownReason(Throwable t);

Review comment:
       Are the methods expected to be called by `Committer`? If so would `failWithKnownReason` and `failWithUnknownReason` be better~? 

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.OptionalLong;
+
+/**
+ * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
+ * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
+ * respective sink writers are transient and will only be created in the subtasks on the
+ * taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ */
+@PublicEvolving
+public interface Sink<InputT> extends Serializable {
+
+    /**
+     * Create a {@link SinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    SinkWriter<InputT> createWriter(InitContext context) throws IOException;
+
+    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
+    @PublicEvolving
+    interface InitContext {
+        /** The first checkpoint id when an application is started. */
+        long INITIAL_CHECKPOINT_ID = 1;
+
+        /**
+         * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
+         * but are part of the jar file of a user job.
+         *
+         * @see UserCodeClassLoader
+         */
+        UserCodeClassLoader getUserCodeClassLoader();
+
+        /**
+         * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task
+         * thread in between record processing.
+         *
+         * <p>Note that this method should not be used per-record for performance reasons in the

Review comment:
       Notes that ?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
+ * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
+ * validated eagerly. The respective sink writers and committers are transient and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
+     * input.
+     *
+     * @param context the runtime context.
+     * @return A sink writer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
+
+    /**
+     * Creates a {@link Committer} that permanently makes the previously written data visible
+     * through {@link Committer#commit(Collection)}.
+     *
+     * @return A committer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    Committer<CommT> createCommitter() throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */
+    @PublicEvolving
+    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
+        /**
+         * Prepare for a commit.

Review comment:
       Prepares

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.OptionalLong;
+
+/**
+ * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
+ * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
+ * respective sink writers are transient and will only be created in the subtasks on the
+ * taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ */
+@PublicEvolving
+public interface Sink<InputT> extends Serializable {
+
+    /**
+     * Create a {@link SinkWriter}.

Review comment:
       `Create` -> `Creates` ?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. Starts at 0 for the first
+         * attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.
+         */
+        void failedWithKnownReason(Throwable t);
+
+        /**
+         * The commit failed for unknown reason and should not be retried.
+         *
+         * <p>Currently calling this method fails the job. In the future the behaviour might be
+         * configurable.
+         */
+        void failedWithUnknownReason(Throwable t);
+
+        /**
+         * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
+         * permanently fail after reaching that maximum. Else the committable will be retried as
+         * long as this method is invoked after each attempt.
+         */
+        void retryLater();
+
+        /**
+         * Updates the underlying committable and retries later (see {@link #retryLater()} for a
+         * description). This method can be used if a committable partially succeeded.
+         */
+        void updateAndRetryLater(CommT committable);
+
+        /**
+         * Signals that a committable is skipped as it was committed already in a previous run. Use
+         * of this method is optional but eases bookkeeping and debugging. It also serves as a code
+         * documentation for the branches dealing with recovery.
+         */
+        void alreadyCommitted();

Review comment:
       Also perhaps change the name to be a verb like `signalAlreadyCommitted`?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
+ * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
+ * validated eagerly. The respective sink writers and committers are transient and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of

Review comment:
       Create -> `Creates` ?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+import java.io.IOException;
+
+/**
+ * The {@code SinkWriter} is responsible for writing data.
+ *
+ * @param <InputT> The type of the sink writer's input
+ */
+@PublicEvolving
+public interface SinkWriter<InputT> extends AutoCloseable {
+
+    /**
+     * Add an element to the writer.

Review comment:
       Also changed to `Adds`? And also for the following comments.

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. Starts at 0 for the first
+         * attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.
+         */
+        void failedWithKnownReason(Throwable t);
+
+        /**
+         * The commit failed for unknown reason and should not be retried.
+         *
+         * <p>Currently calling this method fails the job. In the future the behaviour might be
+         * configurable.
+         */
+        void failedWithUnknownReason(Throwable t);
+
+        /**
+         * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
+         * permanently fail after reaching that maximum. Else the committable will be retried as
+         * long as this method is invoked after each attempt.
+         */
+        void retryLater();
+
+        /**
+         * Updates the underlying committable and retries later (see {@link #retryLater()} for a
+         * description). This method can be used if a committable partially succeeded.
+         */
+        void updateAndRetryLater(CommT committable);
+
+        /**
+         * Signals that a committable is skipped as it was committed already in a previous run. Use

Review comment:
       `use` -> `using`?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.OptionalLong;
+
+/**
+ * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
+ * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
+ * respective sink writers are transient and will only be created in the subtasks on the
+ * taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ */
+@PublicEvolving
+public interface Sink<InputT> extends Serializable {
+
+    /**
+     * Create a {@link SinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    SinkWriter<InputT> createWriter(InitContext context) throws IOException;
+
+    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
+    @PublicEvolving
+    interface InitContext {
+        /** The first checkpoint id when an application is started. */
+        long INITIAL_CHECKPOINT_ID = 1;

Review comment:
       Hi @fapaul could you also elaborate me a bit the usage of this variable~? Since on startup the checkpoint id is not necessarily started from 1, like when the job is recovered from one previous checkpoint~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org