You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/25 10:12:28 UTC

[GitHub] [flink] fapaul commented on a change in pull request #17536: [FLINK-24530][datastream] GlobalCommitter might not commit all records on drain

fapaul commented on a change in pull request #17536:
URL: https://github.com/apache/flink/pull/17536#discussion_r735371614



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java
##########
@@ -17,15 +17,19 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.connector.sink.Committer;
+
 import java.io.IOException;
 import java.util.List;
 
 /**
  * This committer handler simply forwards all committables downstream. It's used in {@link
  * SinkOperator} without committers but with downstream operators (in streaming, only global
  * committer on sink; in batch, committer or global committer present).
+ *
+ * @param <CommT> The input and output type of the {@link Committer}.
  */
-class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, CommT, CommT> {
+class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, Void> {

Review comment:
       I am not sure whether the abstraction of `ForwardingCommittingHandler` and `NoopCommittingHandler` really makes sense anymore after the refactoring. In the end, it could be a simple boolean flag whether the committables should be sent downstream if there is a global committer.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -65,21 +68,55 @@ protected void recoveredCommittables(List<StateT> recovered) throws IOException
         return all;
     }
 
+    protected final Collection<StateT> commitAndReturnSuccess(List<StateT> committables)
+            throws IOException, InterruptedException {
+        Collection<StateT> failed = commit(committables);
+        if (failed.isEmpty()) {
+            return committables;
+        }
+        // Assume that (Global)Committer#commit does not create a new instance for failed
+        // committables. This assumption is documented in the respective JavaDoc.
+        Set<StateT> successful =
+                Collections.newSetFromMap(new IdentityHashMap<>(committables.size()));
+        successful.addAll(committables);
+        successful.removeAll(failed);
+        return successful;
+    }
+
+    protected final Collection<StateT> commit(List<StateT> committables)
+            throws IOException, InterruptedException {
+        List<StateT> failed = commitInternal(committables);
+        recoveredCommittables(failed);
+        return failed;
+    }
+
+    /**
+     * Commits a list of committables.
+     *
+     * @param committables A list of committables that is ready for committing.
+     * @return A list of committables needed to re-commit.
+     */
+    abstract List<StateT> commitInternal(List<StateT> committables)
+            throws IOException, InterruptedException;
+
     @Override
     public boolean needsRetry() {
         return !recoveredCommittables.isEmpty();
     }
 
     @Override
-    public void retry() throws IOException, InterruptedException {
-        retry(prependRecoveredCommittables(Collections.emptyList()));
+    public Collection<CommT> retry() throws IOException, InterruptedException {
+        return retry(prependRecoveredCommittables(Collections.emptyList()));
     }
 
-    protected abstract void retry(List<StateT> recoveredCommittables)
-            throws IOException, InterruptedException;
+    protected Collection<CommT> retry(List<StateT> recoveredCommittables)

Review comment:
       AFAICT `StreamingCommitterHandler` and `BatchCommitterHandler` override this method but have identical implementations
   
   ```java
       @Override
       protected Collection<CommT> retry(List<CommT> recoveredCommittables)
               throws IOException, InterruptedException {
           return commitAndReturnSuccess(recoveredCommittables);
       }
   ```
   
   Why can't we move the implementation to this class in the `retry` method?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -51,6 +53,7 @@
                 Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
 
     protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    public static final TypeInformation<byte[]> BYTES = TypeInformation.of(byte[].class);

Review comment:
       private?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -21,33 +21,44 @@
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Retries the committables of a {@link CommitterHandler} until all committables are eventually
  * committed.
  */
-class CommitRetrier {
+class CommitRetrier<CommT> {
+    @VisibleForTesting static final int RETRY_DELAY = 1000;
     private final ProcessingTimeService processingTimeService;
-    private final CommitterHandler<?> committerHandler;
+    private final CommitterHandler<CommT> committerHandler;
+    private final ThrowingConsumer<? super Collection<CommT>, IOException> committableConsumer;
     private final Clock clock;
-    @VisibleForTesting static final int RETRY_DELAY = 1000;
 
     public CommitRetrier(
-            ProcessingTimeService processingTimeService, CommitterHandler<?> committerHandler) {
-        this(processingTimeService, committerHandler, SystemClock.getInstance());
+            ProcessingTimeService processingTimeService,
+            CommitterHandler<CommT> committerHandler,
+            ThrowingConsumer<? super Collection<CommT>, IOException> committableConsumer) {

Review comment:
       I don't like passing a lambda here because it is only used for emitting. I think we can simplify it by only passing a boolean to determine if emitting is necessary.
   
   Also currently the `CommitterOperator#emitCommittables` references the `commiterRetrier` I think this may lead to weird situations.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -72,4 +78,32 @@ default void snapshotState(StateSnapshotContext context) throws Exception {}
      * @return successfully retried committables that is sent downstream.
      */
     Collection<CommT> retry() throws IOException, InterruptedException;
+
+    /**
+     * The serializable factory of a committer handler such that the stateful implementations of
+     * {@link CommitterHandler} do not need to be {@link Serializable} themselves.
+     */
+    interface Factory<Sink, CommT> extends Serializable {
+        CommitterHandler<CommT> create(Sink sink) throws IOException;
+
+        default <T> T checkSerializerPresent(Optional<T> optional, boolean global) {
+            String scope = global ? " global" : "";
+            checkState(
+                    optional.isPresent(),
+                    "Internal error: a%s committer should only be created if the sink has a%s committable serializer.",
+                    scope,
+                    scope);
+            return optional.get();
+        }
+
+        default <T> T checkCommitterPresent(Optional<T> optional, boolean global) {
+            String scope = global ? " global" : "";
+            checkState(
+                    optional.isPresent(),
+                    "Expected a%s committer because%s committable serializer is set.",
+                    scope,
+                    scope);
+            return optional.get();
+        }
+    }

Review comment:
       Not sure I really like this approach of sharing utility functions but having them in separate class probably does not change much.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org