You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:12 UTC

[06/13] flink git commit: [FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs

[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1882c905
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1882c905
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1882c905

Branch: refs/heads/master
Commit: 1882c90505b0d25775b969cc025c8a3087b82f37
Parents: 6f8b3c6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 14:48:22 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 32 +++++++++++++++++++
 .../utils/SavepointMigrationTestBase.java       | 33 +++++++++++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8d0e841..ab4daa9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -489,6 +489,38 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Reattaches to a running job with the given job id.
+	 *
+	 * @param jobID The job id of the job to attach to
+	 * @return The JobExecutionResult for the jobID
+	 * @throws JobExecutionException if an error occurs during monitoring the job execution
+	 */
+	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+		final LeaderRetrievalService leaderRetrievalService;
+		try {
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
+		}
+
+		ActorGateway jobManagerGateway;
+		try {
+			jobManagerGateway = getJobManagerGateway();
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e);
+		}
+
+		return JobClient.attachToRunningJob(
+				jobID,
+				jobManagerGateway,
+				flinkConfig,
+				actorSystemLoader.get(),
+				leaderRetrievalService,
+				timeout,
+				printStatusDuringExecution);
+	}
+
+	/**
 	 * Cancels a job identified by the job id.
 	 * @param jobId the job id
 	 * @throws Exception In case an error occurred.

http://git-wip-us.apache.org/repos/asf/flink/blob/1882c905/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index fced68c..301fc72 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,14 +25,17 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -49,6 +52,7 @@ import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static junit.framework.Assert.fail;
@@ -207,11 +211,38 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
 
 		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
 
 		boolean done = false;
 		while (DEADLINE.hasTimeLeft()) {
+
+			// try and get a job result, this will fail if the job already failed. Use this
+			// to get out of this loop
+			JobID jobId = jobSubmissionResult.getJobID();
+			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
+
+			try {
+
+				Future<Object> future = clusterClient
+						.getJobManagerGateway()
+						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
+
+				Object result = Await.result(future, timeout);
+
+				if (result instanceof JobManagerMessages.CurrentJobStatus) {
+					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
+						Object jobResult = Await.result(
+								jobListeningContext.getJobResultFuture(),
+								Duration.apply(5, TimeUnit.SECONDS));
+						fail("Job failed: " + jobResult);
+					}
+				}
+			} catch (Exception e) {
+				fail("Could not connect to job: " + e);
+			}
+
 			Thread.sleep(100);
-			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobId);
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {