You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/19 20:04:26 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #13667: [FLINK-19679] Deduplicate code between CheckpointBarrierUnaligner and CheckpointBarrierAligner

rkhachatryan commented on a change in pull request #13667:
URL: https://github.com/apache/flink/pull/13667#discussion_r508019514



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
##########
@@ -39,57 +39,83 @@
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
 
 /**
- * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier
- * and keeping track of the number of received barriers and consumed barriers.
+ * {@link SingleCheckpointBarrierHandler} is used for triggering checkpoint while reading the first barrier
+ * and keeping track of the number of received barriers and consumed barriers. It can handle/track
+ * just single checkpoint at a time. The behaviour when to actually trigger the checkpoint and
+ * what the {@link CheckpointableInput} should do is controlled by {@link CheckpointBarrierBehaviourController}.
  */
 @Internal
 @NotThreadSafe
-public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
+public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 
-	private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierUnaligner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
 
-	private final String taskName;
+	protected final String taskName;

Review comment:
       Is `protected` visibility intentional? 
   I don't see any descendants and would prefer to have them `private` therefore.
   
   (ditto: all fields)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * Controls when the checkpoint should be actually triggered.
+ */
+@Internal
+public interface CheckpointBarrierBehaviourController {
+
+	void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier);
+
+	/**
+	 * @return {@code true} if checkpoint should be triggered.
+	 */
+	boolean firstBarrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException;

Review comment:
       Returning boolean from these two (first/last) methods seem unclear and error-prone to me.
   How about 
   1. introducing `enum CheckpointTriggerStrategy { ON_FIRST_BARRIER, ON_LAST_BARRIER }` 
   2. a method `CheckpointBarrierBehaviourController.getTriggerStrategy`
   3. returning void from first/last
   ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * Controls when the checkpoint should be actually triggered.
+ */
+@Internal
+public interface CheckpointBarrierBehaviourController {
+
+	void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier);
+
+	/**
+	 * @return {@code true} if checkpoint should be triggered.
+	 */
+	boolean firstBarrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException;
+
+	/**
+	 * @return {@code true} if checkpoint should be triggered.
+	 */
+	boolean lastBarrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException;

Review comment:
       The relation between `lastBarrierReceived` and `barrierReceived` is not very clear: whether `barrierReceived` will be called for the last barrier and in which order. The same for `firstBarrierReceived`.
   
   How about renaming to `postProcessLastBarrier` (and `preProcessFirstBarrier`)?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
##########
@@ -39,57 +39,83 @@
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
 
 /**
- * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier
- * and keeping track of the number of received barriers and consumed barriers.
+ * {@link SingleCheckpointBarrierHandler} is used for triggering checkpoint while reading the first barrier
+ * and keeping track of the number of received barriers and consumed barriers. It can handle/track
+ * just single checkpoint at a time. The behaviour when to actually trigger the checkpoint and
+ * what the {@link CheckpointableInput} should do is controlled by {@link CheckpointBarrierBehaviourController}.
  */
 @Internal
 @NotThreadSafe
-public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
+public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
 
-	private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierUnaligner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
 
-	private final String taskName;
+	protected final String taskName;
 
-	private int numBarriersReceived;
+	protected final CheckpointBarrierBehaviourController controller;
 
-	/** A future indicating that all barriers of the a given checkpoint have been read. */
-	private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
+	protected int numBarriersReceived;
 
 	/**
 	 * The checkpoint id to guarantee that we would trigger only one checkpoint when reading the same barrier from
 	 * different channels.
 	 */
-	private long currentCheckpointId = -1L;
+	protected long currentCheckpointId = -1L;
 
-	private int numOpenChannels;
+	protected int numOpenChannels;
 
-	private final SubtaskCheckpointCoordinator checkpointCoordinator;
+	protected CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
 
-	private final CheckpointableInput[] inputs;
-
-	CheckpointBarrierUnaligner(
+	@VisibleForTesting
+	static SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler(
 			SubtaskCheckpointCoordinator checkpointCoordinator,
 			String taskName,
 			AbstractInvokable toNotifyOnCheckpoint,
 			CheckpointableInput... inputs) {
+		return new SingleCheckpointBarrierHandler(
+			taskName,
+			toNotifyOnCheckpoint,
+			(int) Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).count(),
+			new UnalignedController(checkpointCoordinator, inputs));
+	}
+
+	SingleCheckpointBarrierHandler(
+			String taskName,
+			AbstractInvokable toNotifyOnCheckpoint,
+			int numOpenChannels,
+			CheckpointBarrierBehaviourController controller) {
 		super(toNotifyOnCheckpoint);
 
 		this.taskName = taskName;
-		this.inputs = inputs;
-		numOpenChannels = (int) Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).count();
-		this.checkpointCoordinator = checkpointCoordinator;
+		this.numOpenChannels = numOpenChannels;
+		this.controller = controller;
+	}
+
+	@Override
+	protected void abortPendingCheckpoint(

Review comment:
       This method is not used anymore (now _this_ class uses `controller.abortPendingCheckpoint`)
   so the method can be deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org