You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/10 08:44:00 UTC

[GitHub] [flink] fapaul commented on a change in pull request #18302: [FLINK-25569][core] Add decomposed Sink V2 interface

fapaul commented on a change in pull request #18302:
URL: https://github.com/apache/flink/pull/18302#discussion_r780984277



##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. Starts at 0 for the first
+         * attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.

Review comment:
       This PR only introduces the public-facing API but not the internal implementation. I did this to split the PR into more reviewable chunks. 
   
   In general, the two failure methods are designed to provide in the future the possibility to add failure side channels but in the first version, they will only log or fail the job.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org