You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/02 03:55:46 UTC

[GitHub] [flink] zhijiangW commented on a change in pull request #10347: [FLINK-14516][network] Remove non credit-based flow control code

zhijiangW commented on a change in pull request #10347: [FLINK-14516][network] Remove non credit-based flow control code
URL: https://github.com/apache/flink/pull/10347#discussion_r352411778
 
 

 ##########
 File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
 ##########
 @@ -354,4 +363,83 @@ public void describeTo(Description description) {
 			description.appendText("CheckpointMetaData - id = " + checkpointId);
 		}
 	}
+
+	private static class CheckpointNotificationVerifier extends AbstractInvokable {
+		private final List<Long> triggeredCheckpoints = new ArrayList<>();
+		private long expectedAbortCheckpointId = -1;
+		@Nullable
+		private CheckpointFailureReason expectedCheckpointFailureReason;
+
+		public CheckpointNotificationVerifier() {
+			super(new MockEnvironmentBuilder().build());
+		}
+
+		@Override
+		public void invoke() throws Exception {
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
+			try {
+				if (expectedAbortCheckpointId != checkpointId || !matchesExpectedCause(cause)) {
+					throw new Exception(cause);
+				}
+			}
+			finally {
+				expectedAbortCheckpointId = -1;
+				expectedCheckpointFailureReason = null;
+			}
+		}
+
+		private boolean matchesExpectedCause(Throwable cause) {
+			if (expectedCheckpointFailureReason == null) {
+				return true;
+			}
+			Optional<CheckpointException> checkpointException = findThrowable(cause, CheckpointException.class);
+			if (!checkpointException.isPresent()) {
+				return false;
+			}
+			return checkpointException.get().getCheckpointFailureReason() == expectedCheckpointFailureReason;
+		}
+
+		public AssertCheckpointWasAborted expectAbortCheckpoint(
+				long expectedAbortCheckpointId,
+				CheckpointFailureReason expectedCheckpointFailureReason) {
+			this.expectedAbortCheckpointId = expectedAbortCheckpointId;
+			this.expectedCheckpointFailureReason = expectedCheckpointFailureReason;
+			return new AssertCheckpointWasAborted(this);
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) throws Exception {
+			triggeredCheckpoints.add(checkpointMetaData.getCheckpointId());
+		}
+
+		public List<Long> getAndResetTriggeredCheckpoints() {
+			List<Long> copy = new ArrayList<>(triggeredCheckpoints);
+			triggeredCheckpoints.clear();
+			return copy;
+		}
+	}
+
+	private static class AssertCheckpointWasAborted implements Closeable {
+		private CheckpointNotificationVerifier checkpointNotificationVerifier;
 
 Review comment:
   nit: final

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services