You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/13 23:17:55 UTC

[GitHub] [flink] JingGe commented on a change in pull request #18330: [FLINK-25570][streaming] Add topology extension points to Sink V2

JingGe commented on a change in pull request #18330:
URL: https://github.com/apache/flink/pull/18330#discussion_r784386592



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.Objects;
+
+/** The message send from {@link SinkWriter} to {@link Committer}. */
+@Experimental
+public class CommittableMessageTypeInfo<CommT> extends TypeInformation<CommittableMessage<CommT>> {
+
+    private final SerializableSupplier<SimpleVersionedSerializer<CommT>>
+            committableSerializerFactory;
+
+    private CommittableMessageTypeInfo(
+            SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory) {
+        this.committableSerializerFactory = committableSerializerFactory;
+    }
+
+    /**
+     * Returns the type information based on the serializer for a {@link CommittableMessage}.
+     *
+     * @param committableSerializerFactory factory to create the serializer for a {@link
+     *     CommittableMessage}
+     * @param <CommT> type of the committable
+     * @return
+     */
+    public static <CommT>
+            TypeInformation<CommittableMessage<CommT>> forCommittableSerializerFactory(

Review comment:
       NIT: 
   public static <CommT>
               TypeInformation<CommittableMessage<CommT>> for(SerializableSupplier<SimpleVersionedSerializer<CommT>>
                               committableSerializerFactory)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.util.OptionalLong;
+
+/**
+ * This class tracks the information about committables belonging to one checkpoint coming from one
+ * subtask.
+ *
+ * <p>It is sent to down-stream consumers to depict the progress of the committing process.
+ *
+ * @param <CommT> type of the committable
+ */
+@Experimental
+public class CommittableSummary<CommT> implements CommittableMessage<CommT> {

Review comment:
       It seems that <CommT> is unused.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.Objects;
+
+/** The message send from {@link SinkWriter} to {@link Committer}. */

Review comment:
       java doc is the same as the interface CommittableMessage<CommT> 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+import java.util.OptionalLong;
+
+/**
+ * This class tracks the information about committables belonging to one checkpoint coming from one
+ * subtask.
+ *
+ * <p>It is sent to down-stream consumers to depict the progress of the committing process.
+ *
+ * @param <CommT> type of the committable
+ */
+@Experimental
+public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
+    private final int subtaskId;
+    /** May change after recovery. */
+    private final int numberOfSubtasks;
+
+    @Nullable private final Long checkpointId;
+    /** The number of committables coming from the given subtask in the particular checkpoint. */
+    private final int numberOfCommittables;
+    /** The number of committables that have not been successfully committed. */
+    private final int numberOfPendingCommittables;
+    /** The number of committables that are not retried and have been failed. */
+    private final int numberOfFailedCommittables;
+
+    public CommittableSummary(

Review comment:
       Providing builder would be helpful for users to avoid human mistakes, since there are 5 parameters of type int in the method signature. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Allows expert users to implement a custom topology after {@link Committer}.
+ *
+ * <p>It is recommended to use immutable committables because mutating committables can have
+ * unexpected side-effects.
+ */
+@Experimental
+public interface WithPostCommitTopology<InputT, CommT>
+        extends TwoPhaseCommittingSink<InputT, CommT> {
+
+    /**
+     * Adds a custom post-commit topology where all committables can be processed.
+     *
+     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
+     * modes do not require special cases.
+     *
+     * <p>All operations need to be idempotent: on recovery, any number of committables may be
+     * replayed that have already been committed. It's mandatory that these committables have no
+     * effect on the external system.
+     */

Review comment:
       
   ```suggestion
         * @param committables the stream of committables.
         */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org