You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/10 09:32:39 UTC

[flink] branch release-1.11 updated: [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b1f32c6  [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
b1f32c6 is described below

commit b1f32c6efd732787e27f37fbe18fe88f9e38f7c2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Jun 8 16:05:51 2020 +0200

    [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
    
    This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of
    200s as the rpc timeout.
    
    This closes #12531.
---
 .../apache/flink/test/cancelling/CancelingTestBase.java   | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index b3905c0..03bc6eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger {
 
 	protected static final int PARALLELISM = 4;
 
-	protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds
+	private static final Configuration configuration = getConfiguration();
 
 	// --------------------------------------------------------------------------------------------
 
 	@ClassRule
 	public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
 		new MiniClusterResourceConfiguration.Builder()
-			.setConfiguration(getConfiguration())
+			.setConfiguration(configuration)
 			.setNumberTaskManagers(2)
 			.setNumberSlotsPerTaskManager(4)
 			.build());
@@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger {
 		// submit job
 		final JobGraph jobGraph = getJobGraph(plan);
 
+		final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
+
 		ClusterClient<?> client = CLUSTER.getClusterClient();
 		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
 
 		Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 
-		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatus != JobStatus.RUNNING) {
 			Assert.fail("Job not in state RUNNING.");
@@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger {
 
 		Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatusAfterCancel != JobStatus.CANCELED) {
 			Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');