You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/13 08:24:04 UTC
[flink] 06/06: [FLINK-25569][core] Add decomposed Sink V2 interface.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06192741d02714499c6d0f671b21bfa2588a21bf
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Dec 10 16:11:17 2021 +0100
[FLINK-25569][core] Add decomposed Sink V2 interface.
The new interface separates concerns and will make future refactorings and extensions easier. The user immediately which methods needs to be implemented.
---
.../flink/api/connector/sink2/Committer.java | 103 +++++++++++++++++++
.../org/apache/flink/api/connector/sink2/Sink.java | 109 +++++++++++++++++++++
.../flink/api/connector/sink2/SinkWriter.java | 72 ++++++++++++++
.../flink/api/connector/sink2/StatefulSink.java | 97 ++++++++++++++++++
.../connector/sink2/TwoPhaseCommittingSink.java | 80 +++++++++++++++
5 files changed, 461 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
new file mode 100644
index 0000000..c51cce2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the {@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a two-phase commit
+ * protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s must not change the
+ * external system and implementers are asked to signal {@link
+ * CommitRequest#signalAlreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+ /**
+ * Commit the given list of {@link CommT}.
+ *
+ * @param committables A list of commit requests staged by the sink writer.
+ * @throws IOException for reasons that may yield a complete restart of the job.
+ */
+ void commit(Collection<CommitRequest<CommT>> committables)
+ throws IOException, InterruptedException;
+
+ /**
+ * A request to commit a specific committable.
+ *
+ * @param <CommT>
+ */
+ @PublicEvolving
+ interface CommitRequest<CommT> {
+
+ /** Returns the committable. */
+ CommT getCommittable();
+
+ /**
+ * Returns how many times this particular committable has been retried. Starts at 0 for the
+ * first attempt.
+ */
+ int getNumberOfRetries();
+
+ /**
+ * The commit failed for known reason and should not be retried.
+ *
+ * <p>Currently calling this method only logs the error, discards the comittable and
+ * continues. In the future the behaviour might be configurable.
+ */
+ void signalFailedWithKnownReason(Throwable t);
+
+ /**
+ * The commit failed for unknown reason and should not be retried.
+ *
+ * <p>Currently calling this method fails the job. In the future the behaviour might be
+ * configurable.
+ */
+ void signalFailedWithUnknownReason(Throwable t);
+
+ /**
+ * The commit failed for a retriable reason. If the sink supports a retry maximum, this may
+ * permanently fail after reaching that maximum. Else the committable will be retried as
+ * long as this method is invoked after each attempt.
+ */
+ void retryLater();
+
+ /**
+ * Updates the underlying committable and retries later (see {@link #retryLater()} for a
+ * description). This method can be used if a committable partially succeeded.
+ */
+ void updateAndRetryLater(CommT committable);
+
+ /**
+ * Signals that a committable is skipped as it was committed already in a previous run.
+ * Using this method is optional but eases bookkeeping and debugging. It also serves as a
+ * code documentation for the branches dealing with recovery.
+ */
+ void signalAlreadyCommitted();
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
new file mode 100644
index 0000000..4052a6c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.OptionalLong;
+
+/**
+ * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
+ * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
+ * respective sink writers are transient and will only be created in the subtasks on the
+ * taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ */
+@PublicEvolving
+public interface Sink<InputT> extends Serializable {
+
+ /**
+ * Creates a {@link SinkWriter}.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+ SinkWriter<InputT> createWriter(InitContext context) throws IOException;
+
+ /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
+ @PublicEvolving
+ interface InitContext {
+ /**
+ * The first checkpoint id when an application is started and not recovered from a
+ * previously taken checkpoint or savepoint.
+ */
+ long INITIAL_CHECKPOINT_ID = 1;
+
+ /**
+ * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
+ * but are part of the jar file of a user job.
+ *
+ * @see UserCodeClassLoader
+ */
+ UserCodeClassLoader getUserCodeClassLoader();
+
+ /**
+ * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task
+ * thread in between record processing.
+ *
+ * <p>Note that this method should not be used per-record for performance reasons in the
+ * same way as records should not be sent to the external system individually. Rather,
+ * implementers are expected to batch records and only enqueue a single {@link Runnable} per
+ * batch to handle the result.
+ */
+ MailboxExecutor getMailboxExecutor();
+
+ /**
+ * Returns a {@link ProcessingTimeService} that can be used to get the current time and
+ * register timers.
+ */
+ ProcessingTimeService getProcessingTimeService();
+
+ /** @return The id of task where the writer is. */
+ int getSubtaskId();
+
+ /** @return The number of parallel Sink tasks. */
+ int getNumberOfParallelSubtasks();
+
+ /** @return The metric group this writer belongs to. */
+ SinkWriterMetricGroup metricGroup();
+
+ /**
+ * Returns id of the restored checkpoint, if state was restored from the snapshot of a
+ * previous execution.
+ */
+ OptionalLong getRestoredCheckpointId();
+
+ /**
+ * Provides a view on this context as a {@link SerializationSchema.InitializationContext}.
+ */
+ SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java
new file mode 100644
index 0000000..9c3394d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+import java.io.IOException;
+
+/**
+ * The {@code SinkWriter} is responsible for writing data.
+ *
+ * @param <InputT> The type of the sink writer's input
+ */
+@PublicEvolving
+public interface SinkWriter<InputT> extends AutoCloseable {
+
+ /**
+ * Adds an element to the writer.
+ *
+ * @param element The input record
+ * @param context The additional information about the input record
+ * @throws IOException if fail to add an element.
+ */
+ void write(InputT element, Context context) throws IOException, InterruptedException;
+
+ /**
+ * Called on checkpoint or end of input so that the writer to flush all pending data for
+ * at-least-once.
+ */
+ void flush(boolean endOfInput) throws IOException, InterruptedException;
+
+ /**
+ * Adds a watermark to the writer.
+ *
+ * <p>This method is intended for advanced sinks that propagate watermarks.
+ *
+ * @param watermark The watermark.
+ * @throws IOException if fail to add a watermark.
+ */
+ default void writeWatermark(Watermark watermark) throws IOException, InterruptedException {}
+
+ /** Context that {@link #write} can use for getting additional data about an input record. */
+ @PublicEvolving
+ interface Context {
+
+ /** Returns the current event-time watermark. */
+ long currentWatermark();
+
+ /**
+ * Returns the timestamp of the current input record or {@code null} if the element does not
+ * have an assigned timestamp.
+ */
+ Long timestamp();
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
new file mode 100644
index 0000000..a181466
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Sink} with a stateful {@link SinkWriter}.
+ *
+ * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated
+ * eagerly. The respective sink writers are transient and will only be created in the subtasks on
+ * the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <WriterStateT> The type of the sink writer's state
+ */
+@PublicEvolving
+public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
+
+ /**
+ * Create a {@link StatefulSinkWriter}.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+ StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException;
+
+ /**
+ * Create a {@link StatefulSinkWriter} from a recovered state.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+ StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+ InitContext context, Collection<WriterStateT> recoveredState) throws IOException;
+
+ /**
+ * Any stateful sink needs to provide this state serializer and implement {@link
+ * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
+ * #restoreWriter(InitContext, Collection)} on recovery.
+ *
+ * @return the serializer of the writer's state type.
+ */
+ SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
+
+ /**
+ * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible
+ * state to this sink.
+ */
+ @PublicEvolving
+ interface WithCompatibleState {
+ /**
+ * A list of state names of sinks from which the state can be restored. For example, the new
+ * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
+ * drop-in replacement when resuming from a checkpoint/savepoint.
+ */
+ Collection<String> getCompatibleWriterStateNames();
+ }
+
+ /**
+ * A {@link SinkWriter} whose state needs to be checkpointed.
+ *
+ * @param <InputT> The type of the sink writer's input
+ * @param <WriterStateT> The type of the writer's state
+ */
+ @PublicEvolving
+ interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
+ /**
+ * @return The writer's state.
+ * @throws IOException if fail to snapshot writer's state.
+ */
+ List<WriterStateT> snapshotState(long checkpointId) throws IOException;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
new file mode 100644
index 0000000..7fcb159
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
+ * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
+ * validated eagerly. The respective sink writers and committers are transient and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+ /**
+ * Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
+ * input.
+ *
+ * @param context the runtime context.
+ * @return A sink writer for the two-phase commit protocol.
+ * @throws IOException for any failure during creation.
+ */
+ PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
+
+ /**
+ * Creates a {@link Committer} that permanently makes the previously written data visible
+ * through {@link Committer#commit(Collection)}.
+ *
+ * @return A committer for the two-phase commit protocol.
+ * @throws IOException for any failure during creation.
+ */
+ Committer<CommT> createCommitter() throws IOException;
+
+ /** Returns the serializer of the committable type. */
+ SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+ /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
+ @PublicEvolving
+ interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
+ /**
+ * Prepares for a commit.
+ *
+ * <p>This method will be called after {@link #flush(boolean)} and before {@link
+ * StatefulSinkWriter#snapshotState(long)}.
+ *
+ * @return The data to commit as the second step of the two-phase commit protocol.
+ * @throws IOException if fail to prepare for a commit.
+ */
+ Collection<CommT> prepareCommit() throws IOException, InterruptedException;
+ }
+}