You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/29 15:22:31 UTC

[GitHub] [flink] zhijiangW commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

zhijiangW commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432559532



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I indeed considered the way of verifying the race condition via somehow real `AbstractInvokable` with `TaskMailbox`, but also thought that these two components are a bit far away from `CheckpointBarrierHandler` and they are also a bit heavy-weight components from themselves. 
   
   From the aspect of touching less external components in unit tests, i chose the current way. Actually I bypassed the mailbox implementation and simulate the race condition via executing the runnable in mis-order way. The propose for introducing `ValidatingCheckpointInvokable` and `ValidatingCheckpointBarrierUnaligner` is just for avoiding relying on external components of `AbstractInvokable` and `TaskMailbox` in unit tests.
   
   And this test is for verifying the processes of `CheckpointBarrierUnaligner#processBarrier` and `#notifyBarrierReceived`, to confirm the new introduced method `CheckpointBarrierUnaligner#notifyCheckpoint` really effect in these interactions. All these three methods would be really touched in this test.
   
   From another aspect, for the interaction between two components it is better to verify the real interactions using two real components without re-implementing either sides. Then any internal core changes in either component will be reflected in the test. For this case, actually the `CheckpointBarrierUnaligner` component will interact with `AbstractInvokable` with internal `TaskMailbox` model.  `SteppingMailboxProcessor` is also a re-implemented model to replace the real component inside `AbstractInvokable`, so it somehow still relies on the private implementation inside `SteppingMailboxProcessor`.
   
   All in all, it might be better than my current way, and i can try out to use the real model AMAP in this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org