You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/12 21:25:17 UTC

[GitHub] [flink] JingGe commented on a change in pull request #18302: [FLINK-25569][core] Add decomposed Sink V2 interface

JingGe commented on a change in pull request #18302:
URL: https://github.com/apache/flink/pull/18302#discussion_r783350027



##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a two-phase commit
+ * protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link
+ * CommitRequest#signalAlreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how many times this particular committable has been retried. Starts at 0 for the
+         * first attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.
+         */
+        void signalFailedWithKnownReason(Throwable t);
+
+        /**
+         * The commit failed for unknown reason and should not be retried.
+         *
+         * <p>Currently calling this method fails the job. In the future the behaviour might be
+         * configurable.
+         */
+        void signalFailedWithUnknownReason(Throwable t);
+
+        /**
+         * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
+         * permanently fail after reaching that maximum. Else the committable will be retried as
+         * long as this method is invoked after each attempt.
+         */
+        void retryLater();

Review comment:
       I am not sure if I understood it correctly after reading the java doc. Does it mean that this method will be called as long as the maximum is not exceeded? The name `retryLater` sounds like an asynch call, Is that your intention? The follow-up question will be how late? Will the time period be controlled by the configuration, since there is no input of this method?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Sink} with a stateful {@link SinkWriter}.
+ *
+ * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated
+ * eagerly. The respective sink writers are transient and will only be created in the subtasks on
+ * the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <WriterStateT> The type of the sink writer's state
+ */
+@PublicEvolving
+public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link StatefulSinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException;
+
+    /**
+     * Create a {@link StatefulSinkWriter} from a recovered state.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+            InitContext context, Collection<WriterStateT> recoveredState) throws IOException;
+
+    /**
+     * Any stateful sink needs to provide this state serializer and implement {@link
+     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
+     * #restoreWriter(InitContext, Collection)} on recovery.
+     *
+     * @return the serializer of the writer's state type.
+     */
+    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();

Review comment:
       Optional has been removed from multiple methods, this is one of them. Could you explain a little more about your thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org