You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 11:51:58 UTC

[02/11] flink git commit: [FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs

[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3ccffcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3ccffcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3ccffcb

Branch: refs/heads/release-1.2
Commit: a3ccffcb0a1b3e936fff31a61dddf1cada8ab99b
Parents: f68f654
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 14:48:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:50:03 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 32 ++++++++++++++
 .../utils/SavepointMigrationTestBase.java       | 44 +++++++++++++++++---
 2 files changed, 70 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3ccffcb/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8d0e841..ab4daa9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -489,6 +489,38 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Reattaches to a running job with the given job id.
+	 *
+	 * @param jobID The job id of the job to attach to
+	 * @return The JobExecutionResult for the jobID
+	 * @throws JobExecutionException if an error occurs during monitoring the job execution
+	 */
+	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+		final LeaderRetrievalService leaderRetrievalService;
+		try {
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
+		}
+
+		ActorGateway jobManagerGateway;
+		try {
+			jobManagerGateway = getJobManagerGateway();
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e);
+		}
+
+		return JobClient.attachToRunningJob(
+				jobID,
+				jobManagerGateway,
+				flinkConfig,
+				actorSystemLoader.get(),
+				leaderRetrievalService,
+				timeout,
+				printStatusDuringExecution);
+	}
+
+	/**
 	 * Cancels a job identified by the job id.
 	 * @param jobId the job id
 	 * @throws Exception In case an error occurred.

http://git-wip-us.apache.org/repos/asf/flink/blob/a3ccffcb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 2395a4b..ed69858 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.test.checkpointing.utils;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import java.io.File;
 import java.net.URI;
 import java.net.URL;
@@ -25,13 +27,16 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -48,6 +53,7 @@ import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static junit.framework.Assert.fail;
@@ -103,7 +109,8 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}
 
-	protected void executeAndSavepoint(
+	@SafeVarargs
+	protected final void executeAndSavepoint(
 			StreamExecutionEnvironment env,
 			String savepointPath,
 			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
@@ -194,11 +201,8 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 			String savepointPath,
 			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
 
-		int parallelism = env.getParallelism();
-
 		// Retrieve the job manager
-
-		ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
 
 		// Submit the job
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -208,11 +212,38 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
 
 		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
 
 		boolean done = false;
 		while (DEADLINE.hasTimeLeft()) {
+
+			// try and get a job result, this will fail if the job already failed. Use this
+			// to get out of this loop
+			JobID jobId = jobSubmissionResult.getJobID();
+			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
+
+			try {
+
+				Future<Object> future = clusterClient
+						.getJobManagerGateway()
+						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
+
+				Object result = Await.result(future, timeout);
+
+				if (result instanceof JobManagerMessages.CurrentJobStatus) {
+					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
+						Object jobResult = Await.result(
+								jobListeningContext.getJobResultFuture(),
+								Duration.apply(5, TimeUnit.SECONDS));
+						fail("Job failed: " + jobResult);
+					}
+				}
+			} catch (Exception e) {
+				fail("Could not connect to job: " + e);
+			}
+
 			Thread.sleep(100);
-			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobId);
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
@@ -226,6 +257,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 					break;
 				}
 			}
+
 			if (allDone) {
 				done = true;
 				break;