You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:12 UTC
[06/13] flink git commit: [FLINK-5969] Augment
SavepointMigrationTestBase to catch failed jobs
[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1882c905
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1882c905
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1882c905
Branch: refs/heads/master
Commit: 1882c90505b0d25775b969cc025c8a3087b82f37
Parents: 6f8b3c6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 14:48:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClient.java | 32 +++++++++++++++++++
.../utils/SavepointMigrationTestBase.java | 33 +++++++++++++++++++-
2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8d0e841..ab4daa9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -489,6 +489,38 @@ public abstract class ClusterClient {
}
/**
+ * Reattaches to a running job with the given job id.
+ *
+ * @param jobID The job id of the job to attach to
+ * @return The JobExecutionResult for the jobID
+ * @throws JobExecutionException if an error occurs during monitoring the job execution
+ */
+ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+ final LeaderRetrievalService leaderRetrievalService;
+ try {
+ leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
+ }
+
+ ActorGateway jobManagerGateway;
+ try {
+ jobManagerGateway = getJobManagerGateway();
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e);
+ }
+
+ return JobClient.attachToRunningJob(
+ jobID,
+ jobManagerGateway,
+ flinkConfig,
+ actorSystemLoader.get(),
+ leaderRetrievalService,
+ timeout,
+ printStatusDuringExecution);
+ }
+
+ /**
* Cancels a job identified by the job id.
* @param jobId the job id
* @throws Exception In case an error occurred.
http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index fced68c..301fc72 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,14 +25,17 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -49,6 +52,7 @@ import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static junit.framework.Assert.fail;
@@ -207,11 +211,38 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+ JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
+
+ // try and get a job result, this will fail if the job already failed. Use this
+ // to get out of this loop
+ JobID jobId = jobSubmissionResult.getJobID();
+ FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
+
+ try {
+
+ Future<Object> future = clusterClient
+ .getJobManagerGateway()
+ .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
+
+ Object result = Await.result(future, timeout);
+
+ if (result instanceof JobManagerMessages.CurrentJobStatus) {
+ if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
+ Object jobResult = Await.result(
+ jobListeningContext.getJobResultFuture(),
+ Duration.apply(5, TimeUnit.SECONDS));
+ fail("Job failed: " + jobResult);
+ }
+ }
+ } catch (Exception e) {
+ fail("Could not connect to job: " + e);
+ }
+
Thread.sleep(100);
- Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+ Map<String, Object> accumulators = clusterClient.getAccumulators(jobId);
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {