You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/13 08:24:04 UTC

[flink] 06/06: [FLINK-25569][core] Add decomposed Sink V2 interface.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06192741d02714499c6d0f671b21bfa2588a21bf
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Dec 10 16:11:17 2021 +0100

    [FLINK-25569][core] Add decomposed Sink V2 interface.
    
    The new interface separates concerns and will make future refactorings and extensions easier. The user immediately which methods needs to be implemented.
---
 .../flink/api/connector/sink2/Committer.java       | 103 +++++++++++++++++++
 .../org/apache/flink/api/connector/sink2/Sink.java | 109 +++++++++++++++++++++
 .../flink/api/connector/sink2/SinkWriter.java      |  72 ++++++++++++++
 .../flink/api/connector/sink2/StatefulSink.java    |  97 ++++++++++++++++++
 .../connector/sink2/TwoPhaseCommittingSink.java    |  80 +++++++++++++++
 5 files changed, 461 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
new file mode 100644
index 0000000..c51cce2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a two-phase commit
+ * protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link
+ * CommitRequest#signalAlreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how many times this particular committable has been retried. Starts at 0 for the
+         * first attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the comittable and
+         * continues. In the future the behaviour might be configurable.
+         */
+        void signalFailedWithKnownReason(Throwable t);
+
+        /**
+         * The commit failed for unknown reason and should not be retried.
+         *
+         * <p>Currently calling this method fails the job. In the future the behaviour might be
+         * configurable.
+         */
+        void signalFailedWithUnknownReason(Throwable t);
+
+        /**
+         * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
+         * permanently fail after reaching that maximum. Else the committable will be retried as
+         * long as this method is invoked after each attempt.
+         */
+        void retryLater();
+
+        /**
+         * Updates the underlying committable and retries later (see {@link #retryLater()} for a
+         * description). This method can be used if a committable partially succeeded.
+         */
+        void updateAndRetryLater(CommT committable);
+
+        /**
+         * Signals that a committable is skipped as it was committed already in a previous run.
+         * Using this method is optional but eases bookkeeping and debugging. It also serves as a
+         * code documentation for the branches dealing with recovery.
+         */
+        void signalAlreadyCommitted();
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
new file mode 100644
index 0000000..4052a6c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.OptionalLong;
+
+/**
+ * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
+ * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
+ * respective sink writers are transient and will only be created in the subtasks on the
+ * taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ */
+@PublicEvolving
+public interface Sink<InputT> extends Serializable {
+
+    /**
+     * Creates a {@link SinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    SinkWriter<InputT> createWriter(InitContext context) throws IOException;
+
+    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
+    @PublicEvolving
+    interface InitContext {
+        /**
+         * The first checkpoint id when an application is started and not recovered from a
+         * previously taken checkpoint or savepoint.
+         */
+        long INITIAL_CHECKPOINT_ID = 1;
+
+        /**
+         * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
+         * but are part of the jar file of a user job.
+         *
+         * @see UserCodeClassLoader
+         */
+        UserCodeClassLoader getUserCodeClassLoader();
+
+        /**
+         * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task
+         * thread in between record processing.
+         *
+         * <p>Note that this method should not be used per-record for performance reasons in the
+         * same way as records should not be sent to the external system individually. Rather,
+         * implementers are expected to batch records and only enqueue a single {@link Runnable} per
+         * batch to handle the result.
+         */
+        MailboxExecutor getMailboxExecutor();
+
+        /**
+         * Returns a {@link ProcessingTimeService} that can be used to get the current time and
+         * register timers.
+         */
+        ProcessingTimeService getProcessingTimeService();
+
+        /** @return The id of task where the writer is. */
+        int getSubtaskId();
+
+        /** @return The number of parallel Sink tasks. */
+        int getNumberOfParallelSubtasks();
+
+        /** @return The metric group this writer belongs to. */
+        SinkWriterMetricGroup metricGroup();
+
+        /**
+         * Returns id of the restored checkpoint, if state was restored from the snapshot of a
+         * previous execution.
+         */
+        OptionalLong getRestoredCheckpointId();
+
+        /**
+         * Provides a view on this context as a {@link SerializationSchema.InitializationContext}.
+         */
+        SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java
new file mode 100644
index 0000000..9c3394d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+import java.io.IOException;
+
+/**
+ * The {@code SinkWriter} is responsible for writing data.
+ *
+ * @param <InputT> The type of the sink writer's input
+ */
+@PublicEvolving
+public interface SinkWriter<InputT> extends AutoCloseable {
+
+    /**
+     * Adds an element to the writer.
+     *
+     * @param element The input record
+     * @param context The additional information about the input record
+     * @throws IOException if fail to add an element.
+     */
+    void write(InputT element, Context context) throws IOException, InterruptedException;
+
+    /**
+     * Called on checkpoint or end of input so that the writer to flush all pending data for
+     * at-least-once.
+     */
+    void flush(boolean endOfInput) throws IOException, InterruptedException;
+
+    /**
+     * Adds a watermark to the writer.
+     *
+     * <p>This method is intended for advanced sinks that propagate watermarks.
+     *
+     * @param watermark The watermark.
+     * @throws IOException if fail to add a watermark.
+     */
+    default void writeWatermark(Watermark watermark) throws IOException, InterruptedException {}
+
+    /** Context that {@link #write} can use for getting additional data about an input record. */
+    @PublicEvolving
+    interface Context {
+
+        /** Returns the current event-time watermark. */
+        long currentWatermark();
+
+        /**
+         * Returns the timestamp of the current input record or {@code null} if the element does not
+         * have an assigned timestamp.
+         */
+        Long timestamp();
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
new file mode 100644
index 0000000..a181466
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Sink} with a stateful {@link SinkWriter}.
+ *
+ * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated
+ * eagerly. The respective sink writers are transient and will only be created in the subtasks on
+ * the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <WriterStateT> The type of the sink writer's state
+ */
+@PublicEvolving
+public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link StatefulSinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException;
+
+    /**
+     * Create a {@link StatefulSinkWriter} from a recovered state.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+            InitContext context, Collection<WriterStateT> recoveredState) throws IOException;
+
+    /**
+     * Any stateful sink needs to provide this state serializer and implement {@link
+     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
+     * #restoreWriter(InitContext, Collection)} on recovery.
+     *
+     * @return the serializer of the writer's state type.
+     */
+    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
+
+    /**
+     * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible
+     * state to this sink.
+     */
+    @PublicEvolving
+    interface WithCompatibleState {
+        /**
+         * A list of state names of sinks from which the state can be restored. For example, the new
+         * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
+         * drop-in replacement when resuming from a checkpoint/savepoint.
+         */
+        Collection<String> getCompatibleWriterStateNames();
+    }
+
+    /**
+     * A {@link SinkWriter} whose state needs to be checkpointed.
+     *
+     * @param <InputT> The type of the sink writer's input
+     * @param <WriterStateT> The type of the writer's state
+     */
+    @PublicEvolving
+    interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
+        /**
+         * @return The writer's state.
+         * @throws IOException if fail to snapshot writer's state.
+         */
+        List<WriterStateT> snapshotState(long checkpointId) throws IOException;
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
new file mode 100644
index 0000000..7fcb159
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
+ * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
+ * validated eagerly. The respective sink writers and committers are transient and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
+     * input.
+     *
+     * @param context the runtime context.
+     * @return A sink writer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
+
+    /**
+     * Creates a {@link Committer} that permanently makes the previously written data visible
+     * through {@link Committer#commit(Collection)}.
+     *
+     * @return A committer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    Committer<CommT> createCommitter() throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
+    @PublicEvolving
+    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
+        /**
+         * Prepares for a commit.
+         *
+         * <p>This method will be called after {@link #flush(boolean)} and before {@link
+         * StatefulSinkWriter#snapshotState(long)}.
+         *
+         * @return The data to commit as the second step of the two-phase commit protocol.
+         * @throws IOException if fail to prepare for a commit.
+         */
+        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
+    }
+}