You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/29 10:20:12 UTC

[GitHub] [flink] zhijiangW opened a new pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

zhijiangW opened a new pull request #12406:
URL: https://github.com/apache/flink/pull/12406


   ## What is the purpose of the change
   
   The race condition happens as following:
   
   -     CheckpointBarrierUnaligner#notifyBarrierReceived triggers an async checkpoint(ch1) in mailbox by netty thread.
   -     CheckpointBarrierUnaligner#processBarrier also triggers a sync checkpoint(ch2) by task thread and executes immediately.
   -     When ch1 is taken from mailbox by task thread to execute, it will cause illegal argument exception because it is smaller than the previous executed ch2.
       
   For async checkpoint action, before it is actual executing, we can compare its id with previous executed checkpoint id. If it is not larger than the previous one, we should ignore it to exit directly.
   
   ## Brief change log
   
     - *Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived*
     - *Recyle the buffer when exception from ChannelStateWriter.addInputData*
     - *Fix the formatting of CheckpointBarrierUnaligner*
   
   ## Verifying this change
   
     - *Added CheckpointBarrierUnalignerTest#testBufferRecycleOnNotifyBufferReceivedException*
     - *Added CheckpointBarrierUnalignerTest#testConcurrentProcessBarrierAndNotifyBarrierReceived*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636599722


   @pnowojski , I have updated the codes to address above comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67af5b88a2032e6515d41c26586f2fba88cbcd0e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent of `DummyInvokable` that has a `SteppingMailboxProcessor`
   
   > For this case, actually the CheckpointBarrierUnaligner component will interact with AbstractInvokable with internal TaskMailbox model. SteppingMailboxProcessor is also a re-implemented model to replace the real component inside AbstractInvokable, so it somehow still relies on the private implementation inside SteppingMailboxProcessor.
   
   It's not the same. 
   
   1. In your case, you are modifying the class that you are testing, which invalidates the test to some extent.
   
   2. The idea of mocking (for example passing `DummyInvokable` with `SteppingMailboxProcessor`) is that you provide a mock implementation of some interface, that adheres to the contract of those interfaces (maybe in limited/restricted scope, but it shouldn't be braking them) and you pass them to real production code that you intend to test.
   
   For example imagine a change, if `CheckpointBarrierUnaligner` changes and enqueues one more mail in one of the calls. Or that it starts relying on the fact, that order of enqueued emails guarantees their execution order - both of those refactorings would be valid but wouldn't work with your current implementation, giving false failing test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636698426


   Thanks for the review @pnowojski , i will squash the last two commits and merge!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636036676


   >As far as I understand it, it's not a full fix, as EndOfPartitionEvents or checkpoint barrier cancellation markers can still cause the same race condition, and you want to fix it in a follow up PR, right? 
   
   Exactly, it only resolves partial race condition in this PR, and i also thought these still has some issues while handing EOF and cancellation marker, which seems hard to reproduce in `UnalignedCheckpointITCase`. Furthermore, considering the current logic inside `CheckpointBarrierUnaligner` is too complicated, actually it is totally different with my initial PoC version. I try to refactor some logics to simplify the process a bit while resolving the other two issues, so i choose to resolve them separately to not delay the whole progress.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67af5b88a2032e6515d41c26586f2fba88cbcd0e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447) 
   * bee02498f028f38676813dde3b99d318ace084aa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent of `DummyInvokable` that has a `SteppingMailboxProcessor`
   
   > For this case, actually the CheckpointBarrierUnaligner component will interact with AbstractInvokable with internal TaskMailbox model. SteppingMailboxProcessor is also a re-implemented model to replace the real component inside AbstractInvokable, so it somehow still relies on the private implementation inside SteppingMailboxProcessor.
   
   It's not the same. 
   
   1. In your case, you are modifying the class that you are testing, which invalidates the test to some extent. To some extent you are testing your test implementation and you have to assume by looking at the code, that it doesn't affect the purpose of the test.
   
   2. The idea of mocking (for example passing `DummyInvokable` with `SteppingMailboxProcessor`) is that you provide a mock implementation of some interface, that adheres to the contract of those interfaces (maybe in limited/restricted scope, but it shouldn't be braking them) and you pass them to real production code that you intend to test.
   
   For example imagine a change, if `CheckpointBarrierUnaligner` changes and enqueues one more mail in one of the calls. Or that it starts relying on the fact, that order of enqueued emails guarantees their execution order - both of those refactorings would be valid but wouldn't work with your current implementation, giving false failing test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636036676


   Thanks for the efficient review @pnowojski !
   
   >As far as I understand it, it's not a full fix, as EndOfPartitionEvents or checkpoint barrier cancellation markers can still cause the same race condition, and you want to fix it in a follow up PR, right? 
   
   Exactly, it only resolves partial race condition in this PR, and i also thought these still has some issues while handing EOF and cancellation marker, which seems hard to reproduce in `UnalignedCheckpointITCase`. Furthermore, considering the current logic inside `CheckpointBarrierUnaligner` is too complicated, actually it is totally different with my initial PoC version. I try to refactor some logics to simplify the process a bit while resolving the other two issues, so i choose to resolve them separately to not delay the whole progress.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636036676


   Thanks for the efficient review @pnowojski !
   
   >As far as I understand it, it's not a full fix, as EndOfPartitionEvents or checkpoint barrier cancellation markers can still cause the same race condition, and you want to fix it in a follow up PR, right? 
   
   Exactly, it only resolves partial race condition in this PR, and i also thought there still has some issues while handing EOF and cancellation marker, which seems hard to reproduce in `UnalignedCheckpointITCase`. Furthermore, considering the current logic inside `CheckpointBarrierUnaligner` is too complicated, actually it is totally different with my initial PoC version. I try to refactor some logics to simplify the process a bit while resolving the other two issues, so i choose to resolve them separately to not delay the whole progress.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b47a7645fd5636a98050231522a076537576e9a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b47a7645fd5636a98050231522a076537576e9a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bee02498f028f38676813dde3b99d318ace084aa Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452) 
   * b47a7645fd5636a98050231522a076537576e9a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432561333



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -164,33 +158,32 @@ public void processBarrier(
 			hasInflightBuffers[channelIndex] = false;
 			numBarrierConsumed++;
 		}
-		// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
-		// buffer queues
-		// to avoid replicating any logic, we simply call notifyBarrierReceived here as well

Review comment:
       It indeed provides a bit richer information than the javadoc of this method, and i can consider to retain it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67af5b88a2032e6515d41c26586f2fba88cbcd0e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-636698426


   Thanks for the review @pnowojski , merging!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432413567



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -164,33 +158,32 @@ public void processBarrier(
 			hasInflightBuffers[channelIndex] = false;
 			numBarrierConsumed++;
 		}
-		// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
-		// buffer queues
-		// to avoid replicating any logic, we simply call notifyBarrierReceived here as well

Review comment:
       nit: isn't this comment still valid and worth keeping?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -284,21 +280,18 @@ private int getFlattenedChannelIndex(InputChannelInfo channelInfo) {
 		 */
 		private long currentReceivedCheckpointId = -1L;
 
-		/** The number of opened channels. */
+		/** The number of open channels. */

Review comment:
       nit: just drop the comment as it only adds words "the" and "of" to the variable name

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I think this is not the best way to test this race condition. It's dubious to override a class that we want to test here (`ValidatingCheckpointBarrierUnaligner`). Also this is a very tight coupling and depending on the private implementation details, which assumes `executeInTaskThread` will be called only once. More over it brakes an assumption that mails should be executed in order.
   
   I think it would be much better to do in the following manner:
   
   1. straighten up threading model a bit and do not enqueue any emails in `CheckpointBarrierUnaligner#processBarrier` call, as this is already happening inside mailbox thread. It introduces unnecessary possibility for the race conditions and makes our live so much more difficult in this test. Currently by going through mailbox in `processBarrier` we are avoiding a bit of code duplication/simplifing a bit `notifyBarrierReceived` method, in an exchange of bad threading model and actually IMO more complicated code. We should fix this either way.
   
   2. pass an `AbstractInvokable` instance that would be using `SteppingMailboxProcessor` to implement `org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#executeInTaskThread` 
   
   3. inside the `testConcurrentProcessBarrierAndNotifyBarrierReceived` test do the following sequence:
   ```
   handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(0), channelInfo); // (a)
   handler.processBarrier(buildCheckpointBarrier(1), 0); // (b)
   steppingMailboxExecutor.runMailboxStep(); // (c)
   ```
   (c) would execute mailbox action from (a). 
   
   This would test the race condition without braking any contracts (like out of order mail execution) and without overriding `CheckpointBarrierUnaligner`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67af5b88a2032e6515d41c26586f2fba88cbcd0e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447) 
   * bee02498f028f38676813dde3b99d318ace084aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b47a7645fd5636a98050231522a076537576e9a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2506",
       "triggerID" : "b47a7645fd5636a98050231522a076537576e9a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bee02498f028f38676813dde3b99d318ace084aa Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452) 
   * b47a7645fd5636a98050231522a076537576e9a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2506) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b47a7645fd5636a98050231522a076537576e9a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2506",
       "triggerID" : "b47a7645fd5636a98050231522a076537576e9a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b47a7645fd5636a98050231522a076537576e9a8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2506) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67af5b88a2032e6515d41c26586f2fba88cbcd0e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447) 
   * bee02498f028f38676813dde3b99d318ace084aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent of `DummyInvokable` that has a `SteppingMailboxProcessor` (`SteppingMailboxProcessor` already exists in the code base and I'm not even sure if you could just use plain `MailboxProcessor`) 
   
   > For this case, actually the CheckpointBarrierUnaligner component will interact with AbstractInvokable with internal TaskMailbox model. SteppingMailboxProcessor is also a re-implemented model to replace the real component inside AbstractInvokable, so it somehow still relies on the private implementation inside SteppingMailboxProcessor.
   
   It's not the same. 
   
   1. In your case, you are modifying the class that you are testing, which invalidates the test to some extent. To some extent you are testing your test implementation and you have to assume by looking at the code, that it doesn't affect the purpose of the test.
   
   2. The idea of mocking (for example passing `DummyInvokable` with `SteppingMailboxProcessor`) is that you provide a mock implementation of some interface, that adheres to the contract of those interfaces (maybe in limited/restricted scope, but it shouldn't be braking them) and you pass them to real production code that you intend to test.
   
   For example imagine a change, if `CheckpointBarrierUnaligner` changes and enqueues one more mail in one of the calls. Or that it starts relying on the fact, that order of enqueued emails guarantees their execution order - both of those refactorings would be valid but wouldn't work with your current implementation, giving false failing test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635907759


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2447",
       "triggerID" : "67af5b88a2032e6515d41c26586f2fba88cbcd0e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bee02498f028f38676813dde3b99d318ace084aa",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452",
       "triggerID" : "bee02498f028f38676813dde3b99d318ace084aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bee02498f028f38676813dde3b99d318ace084aa Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2452) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW merged pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW merged pull request #12406:
URL: https://github.com/apache/flink/pull/12406


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432559532



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I indeed considered the way of verifying the race condition via somehow real `AbstractInvokable` with `TaskMailbox`, but also thought that these two components are a bit far away from `CheckpointBarrierHandler` and they are also a bit heavy-weight components from themselves. 
   
   From the aspect of touching less external components in unit tests, i chose the current way. Actually I bypassed the mailbox implementation and simulate the race condition via executing the runnable in mis-order way. The propose for introducing `ValidatingCheckpointInvokable` and `ValidatingCheckpointBarrierUnaligner` is just for avoiding relying on external components of `AbstractInvokable` and `TaskMailbox` in unit tests.
   
   And this test is for verifying the processes of `CheckpointBarrierUnaligner#processBarrier` and `#notifyBarrierReceived`, to confirm the new introduced method `CheckpointBarrierUnaligner#notifyCheckpoint` really effect in these interactions. All these three methods would be really touched in this test.
   
   From another aspect, for the interaction between two components it is better to verify the real interactions using two real components without re-implementing either sides. Then any internal core changes in either component will be reflected in the test. For this case, actually the `CheckpointBarrierUnaligner` component will interact with `AbstractInvokable` with internal `TaskMailbox` model.  `SteppingMailboxProcessor` is also a re-implemented model to replace the real component inside `AbstractInvokable`, so it somehow still relies on the private implementation inside `SteppingMailboxProcessor`.
   
   All in all, it might be better than my current way, and i can try out to use the real model AMAP in this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12406:
URL: https://github.com/apache/flink/pull/12406#issuecomment-635898084


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 67af5b88a2032e6515d41c26586f2fba88cbcd0e (Fri May 29 10:25:27 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #12406: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
 			return lastCanceledCheckpointId;
 		}
 	}
+
+	/**
+	 * Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
+	 * id is executed and how many checkpoints are executed.
+	 */
+	private static final class ValidatingCheckpointInvokable extends AbstractInvokable {
+
+		private long expectedCheckpointId;
+
+		private int totalNumCheckpoints;
+
+		ValidatingCheckpointInvokable() {
+			super(new DummyEnvironment("test", 1, 0));
+		}
+
+		public void invoke() {
+			throw new UnsupportedOperationException();
+		}
+
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+			expectedCheckpointId = checkpointMetaData.getCheckpointId();
+			totalNumCheckpoints++;
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		long getTriggeredCheckpointId() {
+			return expectedCheckpointId;
+		}
+
+		int getTotalTriggeredCheckpoints() {
+			return totalNumCheckpoints;
+		}
+	}
+
+	/**
+	 * Specific {@link CheckpointBarrierUnaligner} implementation to mock the scenario that the later triggered
+	 * checkpoint executes before the preceding triggered checkpoint.
+	 */
+	private static final class ValidatingCheckpointBarrierUnaligner extends CheckpointBarrierUnaligner {
+
+		private ThrowingRunnable waitingRunnable;
+		private boolean firstRunnable = true;
+
+		ValidatingCheckpointBarrierUnaligner(AbstractInvokable invokable) {
+			super(
+				new int[]{1},
+				new ChannelStateWriter.NoOpChannelStateWriter(),
+				"test",
+				invokable);
+		}
+
+		@Override
+		protected <E extends Exception> void executeInTaskThread(
+				ThrowingRunnable<E> runnable,
+				String descriptionFormat,
+				Object... descriptionArgs) throws E {
+			if (firstRunnable) {
+				waitingRunnable = runnable;
+				firstRunnable = false;
+			} else {
+				super.executeInTaskThread(runnable, "checkpoint");
+				super.executeInTaskThread(waitingRunnable, "checkpoint");
+			}
+		}
+	}

Review comment:
       I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent of `DummyInvokable` that has a `SteppingMailboxProcessor`
   
   > For this case, actually the CheckpointBarrierUnaligner component will interact with AbstractInvokable with internal TaskMailbox model. SteppingMailboxProcessor is also a re-implemented model to replace the real component inside AbstractInvokable, so it somehow still relies on the private implementation inside SteppingMailboxProcessor.
   
   It's not the same. 
   
   1. In your case, you are modifying the class that you are testing, which invalidates the test to some extent.
   
   2. The idea of mocking (for example passing `DummyInvokable` with `SteppingMailboxProcessor`) is that you provide a mock implementation of some interface, that adheres to the contract of those interfaces (maybe in limited/restricted scope, but it shouldn't be braking them) and you pass them to real production code that you intend to test.
   
   For example imagine a simple change, if `CheckpointBarrierUnaligner` changes and enqueues one more mail in one of the calls. Or that it starts relying on the fact, that order of enqueued emails guarantees their execution order - both of those refactorings would be valid but wouldn't work with your current implementation.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org