You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/10 09:32:39 UTC
[flink] branch release-1.11 updated: [FLINK-17498][tests] Increase
CancelingTestBase rpc timeout to configured Akka ask timeout
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b1f32c6 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
b1f32c6 is described below
commit b1f32c6efd732787e27f37fbe18fe88f9e38f7c2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Jun 8 16:05:51 2020 +0200
[FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of
200s as the rpc timeout.
This closes #12531.
---
.../apache/flink/test/cancelling/CancelingTestBase.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index b3905c0..03bc6eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger {
protected static final int PARALLELISM = 4;
- protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds
+ private static final Configuration configuration = getConfiguration();
// --------------------------------------------------------------------------------------------
@ClassRule
public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfiguration())
+ .setConfiguration(configuration)
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(4)
.build());
@@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
+ final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
+
ClusterClient<?> client = CLUSTER.getClusterClient();
JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
- JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
- jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
@@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger {
Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
- JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
- jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');