You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/03 14:34:32 UTC
[1/3] flink git commit: [hotfix] Introduce NewClusterClient interface
Repository: flink
Updated Branches:
refs/heads/master a666455c9 -> db366cd3d
[hotfix] Introduce NewClusterClient interface
The NewClusterClient interface contains asynchronous submitJob and requestJobResult
methods which will replace the ClusterClient#submitJob method.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72227451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72227451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72227451
Branch: refs/heads/master
Commit: 722274519fbf258531707ef85df4e1846f2fa4d4
Parents: a666455
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 27 09:20:33 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 14:48:43 2018 +0200
----------------------------------------------------------------------
.../flink/client/program/MiniClusterClient.java | 45 ++++++++++++---
.../flink/client/program/NewClusterClient.java | 50 ++++++++++++++++
.../client/program/rest/RestClusterClient.java | 27 +++++----
.../flink/runtime/minicluster/MiniCluster.java | 60 ++++++++++++--------
4 files changed, 138 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/72227451/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index a135359..faf96ec 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,23 +21,25 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
@@ -45,11 +47,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
/**
* Client to interact with a {@link MiniCluster}.
*/
-public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> {
+public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> implements NewClusterClient {
private final MiniCluster miniCluster;
@@ -66,28 +69,54 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
+
if (isDetached()) {
try {
- miniCluster.runDetached(jobGraph);
- } catch (JobExecutionException | InterruptedException e) {
+ return jobSubmissionResultFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ ExceptionUtils.checkInterrupted(e);
+
throw new ProgramInvocationException(
String.format("Could not run job %s in detached mode.", jobGraph.getJobID()),
e);
}
-
- return new JobSubmissionResult(jobGraph.getJobID());
} else {
+ final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
+ (JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));
+
+ final JobResult jobResult;
try {
- return miniCluster.executeJobBlocking(jobGraph);
- } catch (JobExecutionException | InterruptedException e) {
+ jobResult = jobResultFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ ExceptionUtils.checkInterrupted(e);
+
throw new ProgramInvocationException(
String.format("Could not run job %s.", jobGraph.getJobID()),
e);
}
+
+ try {
+ return jobResult.toJobExecutionResult(classLoader);
+ } catch (JobResult.WrappedJobException e) {
+ throw new ProgramInvocationException(e.getCause());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new ProgramInvocationException(e);
+ }
}
}
@Override
+ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
+ return miniCluster.submitJob(jobGraph);
+ }
+
+ @Override
+ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+ return miniCluster.requestJobResult(jobId);
+ }
+
+ @Override
public void cancel(JobID jobId) throws Exception {
miniCluster.cancelJob(jobId).get();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/72227451/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
new file mode 100644
index 0000000..513f7da
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for the new cluster client.
+ */
+public interface NewClusterClient {
+
+ /**
+ * Submit the given {@link JobGraph} to the cluster.
+ *
+ * @param jobGraph to submit
+ * @return Future which is completed with the {@link JobSubmissionResult}
+ */
+ CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);
+
+ /**
+ * Request the {@link JobResult} for the given {@link JobID}.
+ *
+ * @param jobId for which to request the {@link JobResult}
+ * @return Future which is completed with the {@link JobResult}
+ */
+ CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/72227451/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index cf68374..4a4f993 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.NewClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
@@ -101,6 +102,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import akka.actor.AddressFromURIString;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -126,7 +128,7 @@ import java.util.stream.Collectors;
/**
* A {@link ClusterClient} implementation that communicates via HTTP REST requests.
*/
-public class RestClusterClient<T> extends ClusterClient<T> {
+public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
private final RestClusterClientConfiguration restClusterClientConfiguration;
@@ -235,16 +237,14 @@ public class RestClusterClient<T> extends ClusterClient<T> {
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job {}.", jobGraph.getJobID());
- final CompletableFuture<JobSubmitResponseBody> jobSubmissionFuture = submitJob(jobGraph);
+ final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
if (isDetached()) {
try {
- jobSubmissionFuture.get();
+ return jobSubmissionFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() + '.', ExceptionUtils.stripExecutionException(e));
}
-
- return new JobSubmissionResult(jobGraph.getJobID());
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
ignored -> requestJobResult(jobGraph.getJobID()));
@@ -286,7 +286,8 @@ public class RestClusterClient<T> extends ClusterClient<T> {
* @return Future which is completed with the {@link JobResult} once the job has completed or
* with a failure if the {@link JobResult} could not be retrieved.
*/
- public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+ @Override
+ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
return pollResourceAsync(
() -> {
final JobMessageParameters messageParameters = new JobMessageParameters();
@@ -305,7 +306,8 @@ public class RestClusterClient<T> extends ClusterClient<T> {
* @param jobGraph to submit
* @return Future which is completed with the submission response
*/
- public CompletableFuture<JobSubmitResponseBody> submitJob(JobGraph jobGraph) {
+ @Override
+ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
@@ -346,10 +348,13 @@ public class RestClusterClient<T> extends ClusterClient<T> {
}
});
- return submissionFuture.exceptionally(
- (Throwable throwable) -> {
- throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", throwable));
- });
+ return submissionFuture
+ .thenApply(
+ (JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
+ .exceptionally(
+ (Throwable throwable) -> {
+ throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", throwable));
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/72227451/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 59e5ff0..2e826eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
@@ -573,18 +574,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
- final DispatcherGateway currentDispatcherGateway;
- try {
- currentDispatcherGateway = getDispatcherGateway();
- } catch (LeaderRetrievalException e) {
- throw new JobExecutionException(job.getJobID(), e);
- }
-
- // we have to allow queued scheduling in the new mode because we need to request slots
- // from the ResourceManager
- job.setAllowQueuedScheduling(true);
-
- final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+ final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
try {
submissionFuture.get();
@@ -607,21 +597,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
- final DispatcherGateway currentDispatcherGateway;
- try {
- currentDispatcherGateway = getDispatcherGateway();
- } catch (LeaderRetrievalException e) {
- throw new JobExecutionException(job.getJobID(), e);
- }
-
- // we have to allow queued scheduling in the new mode because we need to request slots
- // from the ResourceManager
- job.setAllowQueuedScheduling(true);
-
- final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+ final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
- (Acknowledge ack) -> currentDispatcherGateway.requestJobResult(job.getJobID(), RpcUtils.INF_TIMEOUT));
+ (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
final JobResult jobResult;
@@ -640,6 +619,37 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
+ final DispatcherGateway dispatcherGateway;
+ try {
+ dispatcherGateway = getDispatcherGateway();
+ } catch (LeaderRetrievalException | InterruptedException e) {
+ ExceptionUtils.checkInterrupted(e);
+ return FutureUtils.completedExceptionally(e);
+ }
+
+ // we have to allow queued scheduling in Flip-6 mode because we need to request slots
+ // from the ResourceManager
+ jobGraph.setAllowQueuedScheduling(true);
+
+ final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+
+ return acknowledgeCompletableFuture.thenApply(
+ (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
+ }
+
+ public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+ final DispatcherGateway dispatcherGateway;
+ try {
+ dispatcherGateway = getDispatcherGateway();
+ } catch (LeaderRetrievalException | InterruptedException e) {
+ ExceptionUtils.checkInterrupted(e);
+ return FutureUtils.completedExceptionally(e);
+ }
+
+ return dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT);
+ }
+
private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
[2/3] flink git commit: [FLINK-9094] [tests] Harden
AccumulatorLiveITCase
Posted by tr...@apache.org.
[FLINK-9094] [tests] Harden AccumulatorLiveITCase
The problem was that we did not wait for the proper shut down of the job before
resetting the latches. This could lead to a deadlock.
This closes #5771.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c3d9b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c3d9b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c3d9b0
Branch: refs/heads/master
Commit: 78c3d9b0c657bf06a712ce453edb02da13fa3acf
Parents: 7222745
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 27 09:22:08 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 14:49:17 2018 +0200
----------------------------------------------------------------------
.../test/accumulators/AccumulatorLiveITCase.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78c3d9b0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 302fe3e..379e8ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -140,10 +141,16 @@ public class AccumulatorLiveITCase extends TestLogger {
private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ final ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- client.setDetached(true);
- client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+ final CheckedThread submissionThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+ }
+ };
+
+ submissionThread.start();
try {
NotifyingMapper.notifyLatch.await();
@@ -167,6 +174,9 @@ public class AccumulatorLiveITCase extends TestLogger {
NotifyingMapper.shutdownLatch.trigger();
} finally {
NotifyingMapper.shutdownLatch.trigger();
+
+ // wait for the job to have terminated
+ submissionThread.sync();
}
}
[3/3] flink git commit: [FLINK-6567] [tests] Harden
ExecutionGraphMetricsTest
Posted by tr...@apache.org.
[FLINK-6567] [tests] Harden ExecutionGraphMetricsTest
The problem was that in some cases the currentRestartingTime would not increase
because the iteration in the loop were too fast.
This closes #5782.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db366cd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db366cd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db366cd3
Branch: refs/heads/master
Commit: db366cd3d02a823f93185f29ca7ae93da9e2a04b
Parents: 78c3d9b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 28 15:35:11 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 14:49:53 2018 +0200
----------------------------------------------------------------------
.../ExecutionGraphMetricsTest.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db366cd3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 63b3238..ec0d2e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -120,6 +120,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
assertEquals(JobStatus.RUNNING, executionGraph.getState());
assertEquals(0L, restartingTime.getValue().longValue());
+ // add some pause such that RUNNING and RESTARTING timestamps are not the same
+ Thread.sleep(1L);
+
// fail the job so that it goes into state restarting
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
@@ -129,13 +132,13 @@ public class ExecutionGraphMetricsTest extends TestLogger {
long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
- // wait some time so that the restarting time gauge shows a value different from 0
- Thread.sleep(50);
-
long previousRestartingTime = restartingTime.getValue();
// check that the restarting time is monotonically increasing
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 2; i++) {
+ // add some pause to let the currentRestartingTime increase
+ Thread.sleep(1L);
+
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime >= previousRestartingTime);
@@ -165,13 +168,16 @@ public class ExecutionGraphMetricsTest extends TestLogger {
previousRestartingTime = restartingTime.getValue();
// check that the restarting time does not increase after we've reached the running state
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 2; i++) {
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime == previousRestartingTime);
previousRestartingTime = currentRestartingTime;
}
+ // add some pause such that the RUNNING and RESTARTING timestamps are not the same
+ Thread.sleep(1L);
+
// fail job again
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
@@ -183,12 +189,12 @@ public class ExecutionGraphMetricsTest extends TestLogger {
assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
- Thread.sleep(50);
-
previousRestartingTime = restartingTime.getValue();
// check that the restarting time is increasing again
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 2; i++) {
+ // add some pause to the let currentRestartingTime increase
+ Thread.sleep(1L);
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime >= previousRestartingTime);