You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/12 13:49:03 UTC

[flink] branch release-1.11 updated (38b822f -> d6364ce)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 38b822f  [FLINK-17977][core] Silence type extractor warnings for built-in Row
     new 90a7dab  [FLINK-18137] Handle discarding of triggering checkpoint correctly
     new 76efc78  [hotfix] Add @Nullable annotation to FutureUtils.getWithoutException
     new af15045  [hotfix] Make sure that no exceptions are swallowed in CheckpointCoordinator.startTriggeringCheckpoint
     new 180533c  [FLINK-18233][tests] Increase test timeout to 20s for TaskExecutorSubmissionTest
     new d6364ce  [FLINK-18259][tests] Increase heartbeat timeouts for HeartbeatManagerTest

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  | 63 +++++++++++++---------
 .../flink/runtime/concurrent/FutureUtils.java      |  3 ++
 .../CheckpointCoordinatorTriggeringTest.java       | 57 ++++++++++++++++++++
 .../runtime/heartbeat/HeartbeatManagerTest.java    | 24 ++++-----
 .../taskexecutor/TaskExecutorSubmissionTest.java   | 22 ++++----
 5 files changed, 122 insertions(+), 47 deletions(-)


[flink] 01/05: [FLINK-18137] Handle discarding of triggering checkpoint correctly

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90a7dab894b12b85f21d4cfc637d0d770841cfe5
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 16:17:51 2020 +0200

    [FLINK-18137] Handle discarding of triggering checkpoint correctly
    
    Before discarding a triggering checkpoint could cause a NPE which would stop the
    processing of subsequent checkpoint requests. This commit changes this behaviour
    by checking this condition and instantiating a proper exception in case that a
    triggering checkpoint is being discarded.
    
    This closes #12611.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 52 ++++++++++++--------
 .../CheckpointCoordinatorTriggeringTest.java       | 57 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..6c033d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -544,27 +544,39 @@ public class CheckpointCoordinator {
 						final PendingCheckpoint checkpoint =
 							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
 
-						if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
-							// no exception, no discarding, everything is OK
-							final long checkpointId = checkpoint.getCheckpointId();
-							snapshotTaskState(
-								timestamp,
-								checkpointId,
-								checkpoint.getCheckpointStorageLocation(),
-								request.props,
-								executions,
-								request.advanceToEndOfTime);
-
-							coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-							onTriggerSuccess();
+						Preconditions.checkState(
+							checkpoint != null || throwable != null,
+							"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+						if (throwable != null) {
+							// the initialization might not be finished yet
+							if (checkpoint == null) {
+								onTriggerFailure(request, throwable);
+							} else {
+								onTriggerFailure(checkpoint, throwable);
+							}
 						} else {
-								// the initialization might not be finished yet
-								if (checkpoint == null) {
-									onTriggerFailure(request, throwable);
-								} else {
-									onTriggerFailure(checkpoint, throwable);
-								}
+							if (checkpoint.isDiscarded()) {
+								onTriggerFailure(
+									checkpoint,
+									new CheckpointException(
+										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+										checkpoint.getFailureCause()));
+							} else {
+								// no exception, no discarding, everything is OK
+								final long checkpointId = checkpoint.getCheckpointId();
+								snapshotTaskState(
+									timestamp,
+									checkpointId,
+									checkpoint.getCheckpointStorageLocation(),
+									request.props,
+									executions,
+									request.advanceToEndOfTime);
+
+								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+								onTriggerSuccess();
+							}
 						}
 					},
 					timer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 140441d..3dca350 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -45,14 +48,19 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -529,6 +537,48 @@ checkpointCoordinator.startCheckpointScheduler();
 		assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
 	}
 
+	/**
+	 * This test only fails eventually.
+	 */
+	@Test
+	public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+		final ExecutionVertex executionVertex = mockExecutionVertex(new ExecutionAttemptID());
+
+		final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+		final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+			.setTasks(new ExecutionVertex[]{executionVertex})
+			.setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService))
+			.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+				.build())
+			.build();
+
+		final CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<>();
+		final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+		checkpointCoordinator.addMasterHook(new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+		try {
+			checkpointCoordinator.triggerCheckpoint(false);
+			final CompletableFuture<CompletedCheckpoint> secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+			triggerCheckpointLatch.await();
+			masterHookCheckpointFuture.complete("Completed");
+
+			// discard triggering checkpoint
+			checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+			try {
+				// verify that the second checkpoint request will be executed and eventually times out
+				secondCheckpoint.get();
+				fail("Expected the second checkpoint to fail.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(CheckpointException.class));
+			}
+		} finally {
+			checkpointCoordinator.shutdown(JobStatus.FINISHED);
+			ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService);
+		}
+	}
+
 	private CheckpointCoordinator createCheckpointCoordinator() {
 		return new CheckpointCoordinatorBuilder()
 			.setTimer(manuallyTriggeredScheduledExecutor)
@@ -568,9 +618,15 @@ checkpointCoordinator.startCheckpointScheduler();
 			new CheckpointCoordinatorTestingUtils.StringSerializer();
 
 		private final CompletableFuture<String> checkpointFuture;
+		private final OneShotLatch triggerCheckpointLatch;
 
 		private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
+			this(checkpointFuture, new OneShotLatch());
+		}
+
+		private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
 			this.checkpointFuture = checkpointFuture;
+			this.triggerCheckpointLatch = triggerCheckpointLatch;
 		}
 
 		@Override
@@ -582,6 +638,7 @@ checkpointCoordinator.startCheckpointScheduler();
 		@Override
 		public CompletableFuture<String> triggerCheckpoint(
 			long checkpointId, long timestamp, Executor executor) {
+			triggerCheckpointLatch.trigger();
 			return checkpointFuture;
 		}
 


[flink] 04/05: [FLINK-18233][tests] Increase test timeout to 20s for TaskExecutorSubmissionTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 180533c668778bd032732b21e8c24fe0e664518b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jun 10 15:50:45 2020 +0200

    [FLINK-18233][tests] Increase test timeout to 20s for TaskExecutorSubmissionTest
    
    This closes #12587.
---
 .../taskexecutor/TaskExecutorSubmissionTest.java   | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 5c57b0f..36ebd46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -97,6 +97,8 @@ import static org.mockito.Mockito.mock;
  */
 public class TaskExecutorSubmissionTest extends TestLogger {
 
+	private static final long TEST_TIMEOUT = 20000L;
+
 	@Rule
 	public final TestName testName = new TestName();
 
@@ -107,7 +109,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	/**
 	 * Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testTaskSubmission() throws Exception {
 		final ExecutionAttemptID eid = new ExecutionAttemptID();
 
@@ -134,7 +136,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	 * Tests that the TaskManager sends a proper exception back to the sender if the submit task
 	 * message fails.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testSubmitTaskFailure() throws Exception {
 		final ExecutionAttemptID eid = new ExecutionAttemptID();
 
@@ -160,7 +162,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	/**
 	 * Tests that we can cancel the task of the TaskManager given that we've submitted it.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testTaskSubmissionAndCancelling() throws Exception {
 		final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 		final ExecutionAttemptID eid2 = new ExecutionAttemptID();
@@ -205,7 +207,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	 * Tests that submitted tasks will fail when attempting to send/receive data if no
 	 * ResultPartitions/InputGates are set up.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testGateChannelEdgeMismatch() throws Exception {
 		final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 		final ExecutionAttemptID eid2 = new ExecutionAttemptID();
@@ -247,7 +249,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 		}
 	}
 
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testRunJobWithForwardChannel() throws Exception {
 		ResourceID producerLocation = ResourceID.generate();
 		NettyShuffleDescriptor sdd =
@@ -307,7 +309,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	 * state update back to the job manager.
 	 * the second one blocks to be canceled
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testCancellingDependentAndStateUpdateFails() throws Exception {
 		ResourceID producerLocation = ResourceID.generate();
 		NettyShuffleDescriptor sdd =
@@ -373,7 +375,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	/**
 	 * Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testRemotePartitionNotFound() throws Exception {
 		final int dataPort = NetUtils.getAvailablePort();
 		Configuration config = new Configuration();
@@ -460,7 +462,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	/**
 	 *  Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testLocalPartitionNotFound() throws Exception {
 		ResourceID producerLocation = ResourceID.generate();
 		NettyShuffleDescriptor shuffleDescriptor =
@@ -509,7 +511,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	 * memory segment, we'll block the invokable and wait for the task failure due to the failed
 	 * schedule or update consumers call.
 	 */
-	@Test(timeout = 10000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testFailingScheduleOrUpdateConsumers() throws Exception {
 		final Configuration configuration = new Configuration();
 
@@ -566,7 +568,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 	/**
 	 * Tests request of task back pressure.
 	 */
-	@Test(timeout = 20000L)
+	@Test(timeout = TEST_TIMEOUT)
 	public void testRequestTaskBackPressure() throws Exception {
 		final NettyShuffleDescriptor shuffleDescriptor = newBuilder().buildLocal();
 		final TaskDeploymentDescriptor tdd = createSender(shuffleDescriptor, OutputBlockedInvokable.class);


[flink] 03/05: [hotfix] Make sure that no exceptions are swallowed in CheckpointCoordinator.startTriggeringCheckpoint

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit af15045e87ed2068156e1e408262212b73621b22
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 16:22:06 2020 +0200

    [hotfix] Make sure that no exceptions are swallowed in CheckpointCoordinator.startTriggeringCheckpoint
    
    In order to avoid that CompletableFutures don't swallow exception they need to terminate with an exception handler.
    FutureUtils.assertNoException(CompletableFuture) asserts that the given future does not complete exceptionally. If
    it does, then the system will fail and the exception will be reported.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 83 +++++++++++-----------
 1 file changed, 43 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6c033d9..307ad90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -538,48 +538,51 @@ public class CheckpointCoordinator {
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
-			CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
-				.whenCompleteAsync(
-					(ignored, throwable) -> {
-						final PendingCheckpoint checkpoint =
-							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
-						Preconditions.checkState(
-							checkpoint != null || throwable != null,
-							"Either the pending checkpoint needs to be created or an error must have been occurred.");
-
-						if (throwable != null) {
-							// the initialization might not be finished yet
-							if (checkpoint == null) {
-								onTriggerFailure(request, throwable);
-							} else {
-								onTriggerFailure(checkpoint, throwable);
-							}
-						} else {
-							if (checkpoint.isDiscarded()) {
-								onTriggerFailure(
-									checkpoint,
-									new CheckpointException(
-										CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-										checkpoint.getFailureCause()));
+			FutureUtils.assertNoException(
+				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
+					.handleAsync(
+						(ignored, throwable) -> {
+							final PendingCheckpoint checkpoint =
+								FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+							Preconditions.checkState(
+								checkpoint != null || throwable != null,
+								"Either the pending checkpoint needs to be created or an error must have been occurred.");
+
+							if (throwable != null) {
+								// the initialization might not be finished yet
+								if (checkpoint == null) {
+									onTriggerFailure(request, throwable);
+								} else {
+									onTriggerFailure(checkpoint, throwable);
+								}
 							} else {
-								// no exception, no discarding, everything is OK
-								final long checkpointId = checkpoint.getCheckpointId();
-								snapshotTaskState(
-									timestamp,
-									checkpointId,
-									checkpoint.getCheckpointStorageLocation(),
-									request.props,
-									executions,
-									request.advanceToEndOfTime);
-
-								coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
-								onTriggerSuccess();
+								if (checkpoint.isDiscarded()) {
+									onTriggerFailure(
+										checkpoint,
+										new CheckpointException(
+											CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+											checkpoint.getFailureCause()));
+								} else {
+									// no exception, no discarding, everything is OK
+									final long checkpointId = checkpoint.getCheckpointId();
+									snapshotTaskState(
+										timestamp,
+										checkpointId,
+										checkpoint.getCheckpointStorageLocation(),
+										request.props,
+										executions,
+										request.advanceToEndOfTime);
+
+									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
+									onTriggerSuccess();
+								}
 							}
-						}
-					},
-					timer);
+
+							return null;
+						},
+						timer));
 		} catch (Throwable throwable) {
 			onTriggerFailure(request, throwable);
 		}


[flink] 05/05: [FLINK-18259][tests] Increase heartbeat timeouts for HeartbeatManagerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6364cec088d5b3439da7077b053120e12880769
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 18:37:33 2020 +0200

    [FLINK-18259][tests] Increase heartbeat timeouts for HeartbeatManagerTest
    
    Increasing the heartbeat timeouts should harden the tests in case of slow
    testing machines.
    
    This closes #12612.
---
 .../runtime/heartbeat/HeartbeatManagerTest.java    | 24 ++++++++++------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 14d70aa..30928fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -61,6 +61,8 @@ import static org.mockito.Mockito.when;
  */
 public class HeartbeatManagerTest extends TestLogger {
 	private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
+	public static final long HEARTBEAT_INTERVAL = 50L;
+	public static final long HEARTBEAT_TIMEOUT = 200L;
 
 	/**
 	 * Tests that regular heartbeat signal triggers the right callback functions in the
@@ -145,9 +147,7 @@ public class HeartbeatManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testHeartbeatTimeout() throws Exception {
-		long heartbeatTimeout = 100L;
 		int numHeartbeats = 6;
-		long heartbeatInterval = 20L;
 		final int payload = 42;
 
 		ResourceID ownResourceID = new ResourceID("foobar");
@@ -160,7 +160,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			.createNewTestingHeartbeatListener();
 
 		HeartbeatManagerImpl<Integer, Integer> heartbeatManager = new HeartbeatManagerImpl<>(
-			heartbeatTimeout,
+			HEARTBEAT_TIMEOUT,
 			ownResourceID,
 			heartbeatListener,
 			TestingUtils.defaultScheduledExecutor(),
@@ -173,12 +173,12 @@ public class HeartbeatManagerTest extends TestLogger {
 
 		for (int i = 0; i < numHeartbeats; i++) {
 			heartbeatManager.receiveHeartbeat(targetResourceID, payload);
-			Thread.sleep(heartbeatInterval);
+			Thread.sleep(HEARTBEAT_INTERVAL);
 		}
 
 		assertFalse(timeoutFuture.isDone());
 
-		ResourceID timeoutResourceID = timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+		ResourceID timeoutResourceID = timeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
 
 		assertEquals(targetResourceID, timeoutResourceID);
 	}
@@ -193,8 +193,6 @@ public class HeartbeatManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testHeartbeatCluster() throws Exception {
-		long heartbeatTimeout = 100L;
-		long heartbeatPeriod = 20L;
 		ResourceID resourceIdTarget = new ResourceID("foobar");
 		ResourceID resourceIDSender = new ResourceID("barfoo");
 		final int targetPayload = 42;
@@ -214,15 +212,15 @@ public class HeartbeatManagerTest extends TestLogger {
 			.createNewTestingHeartbeatListener();
 
 		HeartbeatManagerImpl<String, Integer> heartbeatManagerTarget = new HeartbeatManagerImpl<>(
-			heartbeatTimeout,
+			HEARTBEAT_TIMEOUT,
 			resourceIdTarget,
 			heartbeatListenerTarget,
 			TestingUtils.defaultScheduledExecutor(),
 			LOG);
 
 		HeartbeatManagerSenderImpl<Integer, String> heartbeatManagerSender = new HeartbeatManagerSenderImpl<>(
-			heartbeatPeriod,
-			heartbeatTimeout,
+			HEARTBEAT_INTERVAL,
+			HEARTBEAT_TIMEOUT,
 			resourceIDSender,
 			heartbeatListenerSender,
 			TestingUtils.defaultScheduledExecutor(),
@@ -231,17 +229,17 @@ public class HeartbeatManagerTest extends TestLogger {
 		heartbeatManagerTarget.monitorTarget(resourceIDSender, heartbeatManagerSender);
 		heartbeatManagerSender.monitorTarget(resourceIdTarget, heartbeatManagerTarget);
 
-		Thread.sleep(2 * heartbeatTimeout);
+		Thread.sleep(2 * HEARTBEAT_TIMEOUT);
 
 		assertFalse(targetHeartbeatTimeoutFuture.isDone());
 
 		heartbeatManagerTarget.stop();
 
-		ResourceID timeoutResourceID = targetHeartbeatTimeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
+		ResourceID timeoutResourceID = targetHeartbeatTimeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
 
 		assertThat(timeoutResourceID, is(resourceIdTarget));
 
-		int numberHeartbeats = (int) (2 * heartbeatTimeout / heartbeatPeriod);
+		int numberHeartbeats = (int) (2 * HEARTBEAT_TIMEOUT / HEARTBEAT_INTERVAL);
 
 		final Matcher<Integer> numberHeartbeatsMatcher = greaterThanOrEqualTo(numberHeartbeats / 2);
 		assertThat(numReportPayloadCallsTarget.get(), is(numberHeartbeatsMatcher));


[flink] 02/05: [hotfix] Add @Nullable annotation to FutureUtils.getWithoutException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76efc7811bcd159089d2b6687211ebb99ba04352
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jun 11 16:20:46 2020 +0200

    [hotfix] Add @Nullable annotation to FutureUtils.getWithoutException
---
 .../src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 7ea36e0..05be10d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.util.function.SupplierWithException;
 
 import akka.dispatch.OnComplete;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1007,6 +1009,7 @@ public class FutureUtils {
 	 * @return the result of completable future,
 	 * or null if it's unfinished or finished exceptionally
 	 */
+	@Nullable
 	public static <T> T getWithoutException(CompletableFuture<T> future) {
 		if (future.isDone() && !future.isCompletedExceptionally()) {
 			try {