You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 11:51:58 UTC
[02/11] flink git commit: [FLINK-5969] Augment
SavepointMigrationTestBase to catch failed jobs
[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3ccffcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3ccffcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3ccffcb
Branch: refs/heads/release-1.2
Commit: a3ccffcb0a1b3e936fff31a61dddf1cada8ab99b
Parents: f68f654
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 14:48:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:50:03 2017 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClient.java | 32 ++++++++++++++
.../utils/SavepointMigrationTestBase.java | 44 +++++++++++++++++---
2 files changed, 70 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3ccffcb/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8d0e841..ab4daa9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -489,6 +489,38 @@ public abstract class ClusterClient {
}
/**
+ * Reattaches to a running job with the given job id.
+ *
+ * @param jobID The job id of the job to attach to
+ * @return The JobExecutionResult for the jobID
+ * @throws JobExecutionException if an error occurs during monitoring the job execution
+ */
+ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+ final LeaderRetrievalService leaderRetrievalService;
+ try {
+ leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
+ }
+
+ ActorGateway jobManagerGateway;
+ try {
+ jobManagerGateway = getJobManagerGateway();
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e);
+ }
+
+ return JobClient.attachToRunningJob(
+ jobID,
+ jobManagerGateway,
+ flinkConfig,
+ actorSystemLoader.get(),
+ leaderRetrievalService,
+ timeout,
+ printStatusDuringExecution);
+ }
+
+ /**
* Cancels a job identified by the job id.
* @param jobId the job id
* @throws Exception In case an error occurred.
http://git-wip-us.apache.org/repos/asf/flink/blob/a3ccffcb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 2395a4b..ed69858 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -18,6 +18,8 @@
package org.apache.flink.test.checkpointing.utils;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
import java.io.File;
import java.net.URI;
import java.net.URL;
@@ -25,13 +27,16 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -48,6 +53,7 @@ import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static junit.framework.Assert.fail;
@@ -103,7 +109,8 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
- protected void executeAndSavepoint(
+ @SafeVarargs
+ protected final void executeAndSavepoint(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {
@@ -194,11 +201,8 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {
- int parallelism = env.getParallelism();
-
// Retrieve the job manager
-
- ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+ Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -208,11 +212,38 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+ JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
+
+ // try and get a job result, this will fail if the job already failed. Use this
+ // to get out of this loop
+ JobID jobId = jobSubmissionResult.getJobID();
+ FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
+
+ try {
+
+ Future<Object> future = clusterClient
+ .getJobManagerGateway()
+ .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
+
+ Object result = Await.result(future, timeout);
+
+ if (result instanceof JobManagerMessages.CurrentJobStatus) {
+ if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
+ Object jobResult = Await.result(
+ jobListeningContext.getJobResultFuture(),
+ Duration.apply(5, TimeUnit.SECONDS));
+ fail("Job failed: " + jobResult);
+ }
+ }
+ } catch (Exception e) {
+ fail("Could not connect to job: " + e);
+ }
+
Thread.sleep(100);
- Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+ Map<String, Object> accumulators = clusterClient.getAccumulators(jobId);
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
@@ -226,6 +257,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
break;
}
}
+
if (allDone) {
done = true;
break;