You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/13 18:23:34 UTC
[4/5] flink git commit: [FLINK-1879] [client] Simplify JobClient.
Hide actorRefs behind method calls where possible.
[FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind method calls where possible.
- Drop redundant routing actor
- Consistently set the flag to subscribe to updates or not.
- Scala style cleanups: Drop default values for some method parameters.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad637077
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad637077
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad637077
Branch: refs/heads/master
Commit: ad6370772ff863003dc7247dc2123e10ea1c590b
Parents: f81d9f0
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 12:50:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 13 16:38:21 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/LocalExecutor.java | 11 +-
.../org/apache/flink/client/program/Client.java | 50 ++--
.../apache/flink/client/program/ClientTest.java | 10 +-
.../apache/flink/runtime/client/JobClient.java | 256 ++++++++++++++++
.../flink/runtime/client/JobClientActor.java | 150 ++++++++++
.../apache/flink/runtime/client/JobClient.scala | 294 -------------------
.../runtime/messages/JobClientMessages.scala | 28 +-
.../runtime/messages/JobManagerMessages.scala | 4 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 33 ++-
.../minicluster/LocalFlinkMiniCluster.scala | 20 +-
.../SlotCountExceedingParallelismTest.java | 14 +-
.../ScheduleOrUpdateConsumersTest.java | 12 +-
.../jobmanager/CoLocationConstraintITCase.scala | 2 +-
.../runtime/jobmanager/JobManagerITCase.scala | 28 +-
.../runtime/jobmanager/RecoveryITCase.scala | 6 +-
.../runtime/jobmanager/SlotSharingITCase.scala | 4 +-
.../TaskManagerFailsWithSlotSharingITCase.scala | 4 +-
.../flink/streaming/util/ClusterUtil.java | 7 +-
.../streaming/util/TestStreamEnvironment.java | 9 +-
.../flink/test/util/RecordAPITestBase.java | 8 +-
.../apache/flink/test/util/TestEnvironment.java | 9 +-
.../test/cancelling/CancellingTestBase.java | 50 ++--
.../JobSubmissionFailsITCase.java | 221 ++++++--------
.../test/failingPrograms/TaskFailureITCase.java | 4 +-
.../AbstractProcessFailureRecoveryTest.java | 6 +-
.../apache/flink/test/util/FailingTestBase.java | 30 +-
.../jobmanager/JobManagerFailsITCase.scala | 4 +-
.../taskmanager/TaskManagerFailsITCase.scala | 8 +-
28 files changed, 678 insertions(+), 604 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 22e17d0..d327d6f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -16,12 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
import java.util.List;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
@@ -29,7 +27,6 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -180,12 +177,8 @@ public class LocalExecutor extends PlanExecutor {
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
-
- ActorRef jobClient = flink.getJobClient();
-
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient, flink.timeout());
-
+
+ SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 80bdcb8..b4e5af1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -49,10 +49,10 @@ import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -316,27 +316,41 @@ public class Client {
public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();
- final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- if (hostname == null) {
- throw new ProgramInvocationException("Could not find hostname of job manager.");
- }
-
- FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+ InetSocketAddress jobManagerAddress;
+ try {
+ jobManagerAddress = JobClient.getJobManagerAddress(configuration);
+ }
+ catch (IOException e) {
+ throw new ProgramInvocationException(e.getMessage(), e);
+ }
+ LOG.info("JobManager actor system address is " + jobManagerAddress);
+
+ LOG.info("Starting client actor system");
final ActorSystem actorSystem;
- final ActorRef client;
-
try {
- Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration, false);
- actorSystem = pair._1();
- client = pair._2();
+ actorSystem = JobClient.startJobClientActorSystem(configuration);
}
catch (Exception e) {
- throw new ProgramInvocationException("Could not build up connection to JobManager.", e);
+ throw new ProgramInvocationException("Could start client actor system.", e);
+ }
+
+ LOG.info("Looking up JobManager");
+ ActorRef jobManager;
+ try {
+ jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
}
+ catch (IOException e) {
+ throw new ProgramInvocationException("Failed to resolve JobManager", e);
+ }
+ LOG.info("JobManager runs at " + jobManager.path());
+
+ FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+ LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
+ LOG.info("Checking and uploading JAR files");
try {
- JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
+ JobClient.uploadJarFiles(jobGraph, jobManager, timeout);
}
catch (IOException e) {
throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
@@ -344,8 +358,8 @@ public class Client {
try{
if (wait) {
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
+ SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem,
+ jobManager, jobGraph, timeout, printStatusDuringExecution);
try {
return result.toJobExecutionResult(this.userCodeClassLoader);
}
@@ -355,8 +369,8 @@ public class Client {
}
}
else {
- JobClient.submitJobDetached(jobGraph, client, timeout);
- // return a "Fake" execution result with the JobId
+ JobClient.submitJobDetached(jobManager, jobGraph, timeout);
+ // return a dummy execution result with the JobId
return new JobSubmissionResult(jobGraph.getJobID());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 22865ed..9a37dde 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -23,7 +23,6 @@ import akka.actor.Props;
import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -38,6 +37,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;
@@ -224,7 +224,13 @@ public class ClientTest {
@Override
public void onReceive(Object message) throws Exception {
- getSender().tell(new Status.Success(new JobID()), getSelf());
+ if (message instanceof JobManagerMessages.SubmitJob) {
+ JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
+ getSender().tell(new Status.Success(jid), getSelf());
+ }
+ else {
+ getSender().tell(new Status.Failure(new Exception("Unknown message " + message)), getSelf());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
new file mode 100644
index 0000000..aa03491
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The JobClient bridges between the JobManager's asynchronous actor messages and
+ * the synchronous method calls to trigger.
+ */
+public class JobClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
+
+
+ public static ActorSystem startJobClientActorSystem(Configuration config)
+ throws IOException {
+ LOG.info("Starting JobClient actor system");
+ Option<Tuple2<String, Object>> remoting =
+ new Some<Tuple2<String, Object>>(new Tuple2<String, Object>("", 0));
+
+ // start a remote actor system to listen on an arbitrary port
+ ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
+ Address address = system.provider().getDefaultAddress();
+
+ String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+ int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
+ LOG.info("Started JobClient actor system at " + host + ':' + port);
+
+ return system;
+ }
+
+ /**
+ * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then
+ * the JobClient is executed in the same actor system as the JobManager. Thus, they can
+ * communicate locally.
+ *
+ * @param config Configuration object containing all user provided configuration values
+ * @return The socket address of the JobManager actor system
+ */
+ public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
+
+ String jobManagerAddress = config.getString(
+ ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+
+ int jobManagerRPCPort = config.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+ if (jobManagerAddress == null) {
+ throw new RuntimeException(
+ "JobManager address has not been specified in the configuration.");
+ }
+
+ try {
+ return new InetSocketAddress(
+ InetAddress.getByName(jobManagerAddress), jobManagerRPCPort);
+ }
+ catch (UnknownHostException e) {
+ throw new IOException("Cannot resolve JobManager hostname " + jobManagerAddress, e);
+ }
+ }
+
+ /**
+ * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
+ * the JobManager. The method blocks until the job has finished or the JobManager is no longer
+ * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
+ * case a [[JobExecutionException]] is thrown.
+ *
+ * @param actorSystem The actor system that performs the communication.
+ * @param jobManager The JobManager that should execute the job.
+ * @param jobGraph JobGraph describing the Flink job
+ * @param timeout Timeout for futures
+ * @return The job execution result
+ * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
+ * execution fails.
+ */
+ public static SerializedJobExecutionResult submitJobAndWait(ActorSystem actorSystem, ActorRef jobManager,
+ JobGraph jobGraph, FiniteDuration timeout,
+ boolean sysoutLogUpdates) throws JobExecutionException
+ {
+ if (actorSystem == null || jobManager == null || jobGraph == null || timeout == null) {
+ throw new NullPointerException();
+ }
+ // for this job, we create a proxy JobClientActor that deals with all communication with
+ // the JobManager. It forwards the job submission, checks the success/failure responses, logs
+ // update messages, watches for disconnect between client and JobManager, ...
+
+ Props jobClientActorProps = Props.create(JobClientActor.class, jobManager, LOG, sysoutLogUpdates);
+ ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
+
+ try {
+ Future<Object> future = Patterns.ask(jobClientActor,
+ new JobClientMessages.SubmitJobAndWait(jobGraph),
+ new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+ Object answer = Await.result(future, timeout);
+
+ if (answer instanceof JobManagerMessages.JobResultSuccess) {
+ LOG.info("Job execution complete");
+
+ SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
+ if (result != null) {
+ return result;
+ } else {
+ throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
+ }
+ } else if (answer instanceof Status.Failure) {
+ throw ((Status.Failure) answer).cause();
+ } else {
+ throw new Exception("Unknown answer after submitting the job: " + answer);
+ }
+ }
+ catch (JobExecutionException e) {
+ throw e;
+ }
+ catch (TimeoutException e) {
+ throw new JobTimeoutException(jobGraph.getJobID(), "Lost connection to JobManager", e);
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Communication with JobManager failed: " + t.getMessage(), t);
+ }
+ finally {
+ // failsafe shutdown of the client actor
+ jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+
+ /**
+ * Submits a job in detached mode. The method sends the JobGraph to the
+ * JobManager and waits for the answer whether teh job could be started or not.
+ *
+ * @param jobGraph The job
+ * @param timeout Timeout in which the JobManager must have responded.
+ */
+ public static void submitJobDetached(ActorRef jobManager, JobGraph jobGraph, FiniteDuration timeout)
+ throws JobExecutionException {
+ if (jobManager == null || jobGraph == null || timeout == null) {
+ throw new NullPointerException();
+ }
+
+ Future<Object> future = Patterns.ask(jobManager,
+ new JobManagerMessages.SubmitJob(jobGraph, false),
+ new Timeout(timeout));
+ try {
+ Object result = Await.result(future, timeout);
+ if (result instanceof JobID) {
+ JobID respondedID = (JobID) result;
+ if (!respondedID.equals(jobGraph.getJobID())) {
+ throw new Exception("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() +
+ ", response: " + respondedID);
+ }
+ }
+ else {
+ throw new Exception("Unexpected response: " + result);
+ }
+ }
+ catch (JobExecutionException e) {
+ throw e;
+ }
+ catch (TimeoutException e) {
+ throw new JobTimeoutException(jobGraph.getJobID(),
+ "JobManager did not respond within " + timeout.toString(), e);
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Failed to send job to JobManager: " + t.getMessage(), t.getCause());
+ }
+ }
+
+ /**
+ * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the
+ * JobManager. The respective port is retrieved from the JobManager. This function issues a
+ * blocking call.
+ *
+ * @param jobGraph Flink job containing the information about the required jars
+ * @param jobManager ActorRef of the JobManager.
+ * @param timeout Timeout for futures
+ * @throws IOException Thrown, if the file upload to the JobManager failed.
+ */
+ public static void uploadJarFiles(JobGraph jobGraph, ActorRef jobManager, FiniteDuration timeout)
+ throws IOException {
+ if (jobGraph.hasUsercodeJarFiles()) {
+ Timeout tOut = new Timeout(timeout);
+ Future<Object> futureBlobPort = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestBlobManagerPort(),
+ tOut);
+
+ int port;
+ try {
+ Object result = Await.result(futureBlobPort, timeout);
+ if (result instanceof Integer) {
+ port = (Integer) result;
+ } else {
+ throw new Exception("Expected port number (int) as answer, received " + result);
+ }
+ }
+ catch (Exception e) {
+ throw new IOException("Could not retrieve the JobManager's blob port.", e);
+ }
+
+ Option<String> jmHost = jobManager.path().address().host();
+ String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
+ InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
+
+ jobGraph.uploadRequiredJarFiles(serverAddress);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
new file mode 100644
index 0000000..ee31e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+
+/**
+ * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
+ * is used to submit jobs to the JobManager and to request the port of the BlobManager.
+ */
+public class JobClientActor extends UntypedActor {
+
+ private final ActorRef jobManager;
+ private final Logger logger;
+ private final boolean sysoutUpdates;
+
+ private ActorRef submitter;
+
+
+ public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates) {
+ if (jobManager == null || logger == null) {
+ throw new NullPointerException();
+ }
+ this.jobManager = jobManager;
+ this.logger = logger;
+ this.sysoutUpdates = sysoutUpdates;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ // =========== State Change Messages ===============
+
+ if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
+ logAndPrintMessage(message);
+ }
+ else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
+ logAndPrintMessage(message);
+ }
+
+ // =========== Job Life Cycle Messages ===============
+
+ // submit a job to the JobManager
+ else if (message instanceof JobClientMessages.SubmitJobAndWait) {
+ // sanity check that this no job was submitted through this actor before -
+ // it is a one-shot actor after all
+ if (this.submitter == null) {
+ JobGraph jobGraph = ((JobClientMessages.SubmitJobAndWait) message).jobGraph();
+ if (jobGraph == null) {
+ logger.error("Received null JobGraph");
+ sender().tell(new Status.Failure(new Exception("JobGraph is null")), getSelf());
+ }
+ else {
+ logger.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
+ jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
+
+ this.submitter = getSender();
+ jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, true), getSelf());
+
+ // make sure we notify the sender when the connection got lost
+ getContext().watch(jobManager);
+ }
+ }
+ else {
+ // repeated submission - tell failure to sender and kill self
+ String msg = "Received repeated 'SubmitJobAndWait'";
+ logger.error(msg);
+ getSender().tell(new Status.Failure(new Exception(msg)), ActorRef.noSender());
+
+ getContext().unwatch(jobManager);
+ getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ // acknowledgement to submit job is only logged, our original
+ // submitter is only interested in the final job result
+ else if (message instanceof JobManagerMessages.JobResultSuccess) {
+ // forward the success to the original job submitter
+ logger.debug("Received JobResultSuccess message from JobManager");
+ if (this.submitter != null) {
+ this.submitter.tell(message, getSelf());
+ }
+
+ // we are done, stop ourselves
+ getContext().unwatch(jobManager);
+ getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ else if (message instanceof Status.Success) {
+ // job was successfully submitted :-)
+ logger.info("Job was successfully submitted to the JobManager");
+ }
+ else if (message instanceof Status.Failure) {
+ // job execution failed, inform the actor that submitted the job
+ logger.debug("Received failure from JobManager", ((Status.Failure) message).cause());
+ if (submitter != null) {
+ submitter.tell(message, sender());
+ }
+ }
+
+ // =========== Actor / Communication Failure ===============
+
+ else if (message instanceof Terminated) {
+ ActorRef target = ((Terminated) message).getActor();
+ if (jobManager.equals(target)) {
+ String msg = "Lost connection to JobManager " + jobManager.path();
+ logger.info(msg);
+ submitter.tell(new Status.Failure(new Exception(msg)), getSelf());
+ } else {
+ logger.error("Received 'Terminated' for unknown actor " + target);
+ }
+ }
+
+ // =========== Unknown Messages ===============
+
+ else {
+ logger.error("JobClient received unknown message: " + message);
+ }
+ }
+
+ private void logAndPrintMessage(Object message) {
+ logger.info(message.toString());
+ if (sysoutUpdates) {
+ System.out.println(message.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
deleted file mode 100644
index 013fe4c..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.client
-
-import java.io.IOException
-import java.net.{InetAddress, InetSocketAddress}
-
-import akka.actor.Status.{Success, Failure}
-import akka.actor._
-import akka.pattern.{Patterns, ask}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-
-import scala.concurrent.{TimeoutException, Await}
-import scala.concurrent.duration.FiniteDuration
-
-/**
- * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
- * is used to submit jobs to the JobManager and to request the port of the BlobManager.
- *
- * @param jobManager ActorRef to JobManager
- */
-class JobClient(jobManager: ActorRef) extends
-Actor with ActorLogMessages with ActorLogging {
-
- override def receiveWithLogMessages: Receive = {
-
- case SubmitJobDetached(jobGraph) =>
- jobManager forward SubmitJob(jobGraph, registerForEvents = false)
-
- case cancelJob: CancelJob =>
- jobManager forward cancelJob
-
- case SubmitJobAndWait(jobGraph, listen) =>
- val listener = context.actorOf(Props(classOf[JobClientListener], sender()))
- jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen), listener)
-
- case RequestBlobManagerPort =>
- jobManager forward RequestBlobManagerPort
-
- case RequestJobManagerStatus =>
- jobManager forward RequestJobManagerStatus
- }
-
- /**
- * Handle unmatched messages with an exception.
- */
- override def unhandled(message: Any): Unit = {
- // let the actor crash
- throw new RuntimeException("Received unknown message " + message)
- }
-}
-
-/**
- * Helper actor which listens to status messages from the JobManager and prints them on the
- * standard output. Such an actor is started for each job, which is configured to listen to these
- * status messages.
- *
- * @param jobSubmitter Akka URL of the sender of the job
- */
-class JobClientListener(jobSubmitter: ActorRef) extends Actor with ActorLogMessages with
-ActorLogging {
- override def receiveWithLogMessages: Receive = {
- case failure: Failure =>
- jobSubmitter ! failure
- self ! PoisonPill
-
- case Success(_) =>
-
- case JobResultSuccess(result) =>
- jobSubmitter ! result
- self ! PoisonPill
-
- case msg =>
- // we have to use System.out.println here to avoid erroneous behavior for output redirection
- System.out.println(msg.toString)
- }
-
- /**
- * Handle unmatched messages with an exception.
- */
- override def unhandled(message: Any): Unit = {
- // let the actor crash
- throw new RuntimeException("Received unknown message " + message)
- }
-}
-
-/**
- * JobClient's companion object containing convenience functions to start a JobClient actor, parse
- * the configuration to extract the JobClient's settings and convenience functions to submit jobs.
- */
-object JobClient {
-
- val JOB_CLIENT_NAME = "jobclient"
-
- @throws(classOf[IOException])
- def startActorSystemAndActor(config: Configuration,
- localActorSystem: Boolean): (ActorSystem, ActorRef) = {
-
- // start a remote actor system to listen on an arbitrary port
- val actorSystem = AkkaUtils.createActorSystem(configuration = config,
- listeningAddress = Some(("", 0)))
- try {
- val jobClientActor = createJobClientFromConfig(config, localActorSystem, actorSystem)
- (actorSystem, jobClientActor)
- }
- catch {
- case t: Throwable => {
- actorSystem.shutdown()
- throw t
- }
- }
- }
-
- @throws(classOf[IOException])
- def createJobClientFromConfig(config: Configuration,
- localActorSystem: Boolean,
- actorSystem: ActorSystem): ActorRef = {
-
- val jobManagerAddress = getJobManagerUrlFromConfig(config, localActorSystem)
- createJobClient(jobManagerAddress, actorSystem, config)
- }
-
- @throws(classOf[IOException])
- def createJobClient(jobManagerURL: String,
- actorSystem: ActorSystem,
- config: Configuration): ActorRef = {
-
- val timeout = AkkaUtils.getLookupTimeout(config)
- val jobManager = JobManager.getJobManagerRemoteReference(jobManagerURL, actorSystem, timeout)
-
- actorSystem.actorOf(Props(classOf[JobClient], jobManager), JOB_CLIENT_NAME)
- }
-
-
- /**
- * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then
- * the JobClient is executed in the same actor system as the JobManager. Thus, they can
- * communicate locally.
- *
- * @param configuration Configuration object containing all user provided configuration values
- * @param localActorSystem true if the JobClient runs in the same actor system as the JobManager,
- * otherwise false
- * @return Akka URL of the JobManager
- */
- def getJobManagerUrlFromConfig(configuration: Configuration,
- localActorSystem: Boolean): String = {
- if (localActorSystem) {
- // JobManager and JobClient run in the same ActorSystem
- JobManager.getLocalJobManagerAkkaURL
- } else {
- val jobManagerAddress = configuration.getString(
- ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-
- val jobManagerRPCPort = configuration.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if (jobManagerAddress == null) {
- throw new RuntimeException(
- "JobManager address has not been specified in the configuration.")
- }
-
- val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress),
- jobManagerRPCPort)
- JobManager.getRemoteJobManagerAkkaURL(hostPort)
- }
- }
-
- /**
- * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
- * the JobManager. The method blocks until the job has finished or the JobManager is no longer
- * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
- * case a [[JobExecutionException]] is thrown.
- *
- * @param jobGraph JobGraph describing the Flink job
- * @param listenToStatusEvents true if the JobClient shall print status events of the
- * corresponding job, otherwise false
- * @param jobClient ActorRef to the JobClient
- * @param timeout Timeout for futures
- * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
- * execution fails.
- * @return The job execution result
- */
- @throws(classOf[JobExecutionException])
- def submitJobAndWait(jobGraph: JobGraph,
- listenToStatusEvents: Boolean,
- jobClient: ActorRef,
- timeout: FiniteDuration): SerializedJobExecutionResult = {
-
- var waitForAnswer = true
- var answer: SerializedJobExecutionResult = null
-
- val result = (jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))(
- AkkaUtils.INF_TIMEOUT).mapTo[SerializedJobExecutionResult]
-
- while (waitForAnswer) {
- try {
- answer = Await.result(result, timeout)
- waitForAnswer = false
- } catch {
- case x: TimeoutException =>
- val jmStatus = (jobClient ? RequestJobManagerStatus)(timeout).mapTo[JobManagerStatus]
-
- try {
- Await.result(jmStatus, timeout)
- } catch {
- case t: Throwable =>
- throw new JobTimeoutException(jobGraph.getJobID, "Lost connection to " +
- "job manager.", t)
- }
- }
- }
-
- answer
- }
-
- /**
- * Submits a job in detached mode. The method sends the corresponding [[JobGraph]] to the
- * JobClient specified by jobClient. The JobClient does not start a [[JobClientListener]] and
- * simply returns a possible failure on the [[JobManager]].
- *
- * @param jobGraph Flink job
- * @param jobClient ActorRef to the JobClient
- * @param timeout Timeout for futures
- * @return The submission response
- */
- @throws(classOf[JobExecutionException])
- def submitJobDetached(jobGraph: JobGraph,
- jobClient: ActorRef,
- timeout: FiniteDuration): Unit = {
-
- val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
-
- try {
- Await.result(response, timeout)
- } catch {
- case timeout: TimeoutException =>
- throw new JobTimeoutException(jobGraph.getJobID,
- "Timeout while submitting the job to the JobManager.", timeout);
- }
- }
-
- /**
- * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the
- * JobManager. The respective port is retrieved from the JobManager. This function issues a
- * blocking call.
- *
- * @param jobGraph Flink job containing the information about the required jars
- * @param hostname Hostname of the instance on which the BlobServer and also the JobManager run
- * @param jobClient ActorRef to the JobClient
- * @param timeout Timeout for futures
- * @throws IOException Thrown, if the file upload to the JobManager failed.
- */
- @throws(classOf[IOException])
- def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(
- implicit timeout: FiniteDuration): Unit = {
-
- if (jobGraph.hasUsercodeJarFiles()) {
- val futureBlobPort = Patterns.ask(jobClient, RequestBlobManagerPort, timeout).mapTo[Int]
-
- val port = try {
- Await.result(futureBlobPort, timeout)
- } catch {
- case e: Exception => throw new IOException("Could not retrieve the server's blob port.", e)
- }
-
- val serverAddress = new InetSocketAddress(hostname, port)
-
- jobGraph.uploadRequiredJarFiles(serverAddress)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index 425d7f8..e0dce35 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -26,26 +26,24 @@ import org.apache.flink.runtime.jobgraph.JobGraph
object JobClientMessages {
/**
- * This message submits a jobGraph to the JobClient which sends it to the JobManager. The
- * JobClient waits until the job has been executed. If listenToEvents is true,
- * then the JobClient prints all state change messages to the console. The
- * JobClient sends the result of the execution back to the sender. If the execution is
- * successful then a [[org.apache.flink.runtime.messages.JobManagerMessages.JobResult]] is sent
- * back. If a [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure]]
- * happens, then the cause is sent back to the sender().
+ * This message is sent to the JobClient (via ask) to submit a job and
+ * get a response when the job execution has finished.
+ *
+ * The response to this message is a
+ * [[org.apache.flink.runtime.client.SerializedJobExecutionResult]]
*
- * @param jobGraph containing the job description
- * @param listenToEvents if true then print state change messages
+ * @param jobGraph The job to be executed.
*/
- case class SubmitJobAndWait(jobGraph: JobGraph, listenToEvents: Boolean = false)
+ case class SubmitJobAndWait(jobGraph: JobGraph)
/**
- * This message submits a jobGraph to the JobClient which sends it to the JobManager. The
- * JobClient awaits the
- * [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse]]
- * from the JobManager and sends it back to the sender().
+ * This message is sent to the JobClient (via ask) to submit a job and
+ * return as soon as the result of the submit operation is known.
*
- * @param jobGraph containing the job description
+ * The response to this message is a
+ * [[org.apache.flink.api.common.JobSubmissionResult]]
+ *
+ * @param jobGraph The job to be executed.
*/
case class SubmitJobDetached(jobGraph: JobGraph)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5b70294..03e837d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -37,10 +37,10 @@ object JobManagerMessages {
* then the sender will be registered as listener for the state change messages.
* The submission result will be sent back to the sender as a success message.
*
- * @param jobGraph
+ * @param jobGraph The job to be submitted to the JobManager
* @param registerForEvents if true, then register for state change events
*/
- case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false)
+ case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean)
/**
* Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0e29345..8a6c394 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,11 +24,15 @@ import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
+import org.apache.flink.api.common.JobSubmissionResult
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
+import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.slf4j.LoggerFactory
+import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Await}
/**
@@ -162,10 +166,8 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
def awaitTermination(): Unit = {
jobManagerActorSystem.awaitTermination()
- if(!singleActorSystem) {
- taskManagerActorSystems foreach {
- _.awaitTermination()
- }
+ taskManagerActorSystems foreach {
+ _.awaitTermination()
}
}
@@ -178,4 +180,27 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
Await.ready(Future.sequence(futures), timeout)
}
+
+ @throws(classOf[JobExecutionException])
+ def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean)
+ : SerializedJobExecutionResult = {
+
+ submitJobAndWait(jobGraph, printUpdates, timeout)
+ }
+
+ @throws(classOf[JobExecutionException])
+ def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: FiniteDuration)
+ : SerializedJobExecutionResult = {
+
+ val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
+ else JobClient.startJobClientActorSystem(configuration)
+
+ JobClient.submitJobAndWait(clientActorSystem, jobManagerActor, jobGraph, timeout, printUpdates)
+ }
+
+ @throws(classOf[JobExecutionException])
+ def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
+ JobClient.submitJobDetached(jobManagerActor, jobGraph, timeout)
+ new JobSubmissionResult(jobGraph.getJobID)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 13e1ccd..d7cd6e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,11 +48,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
jobManagerActorSystem
} else {
// create an actor system listening on a random port
- AkkaUtils.createDefaultActorSystem()
+ JobClient.startJobClientActorSystem(configuration)
}
- var jobClient: Option[ActorRef] = None
-
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val config = getDefaultConfig
@@ -114,22 +112,6 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
classOf[TaskManager])
}
- def getJobClient(): ActorRef = {
- jobClient match {
- case Some(jc) => jc
- case None =>
- val config = new Configuration()
-
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
-
- val jc = JobClient.createJobClientFromConfig(config, singleActorSystem,
- jobClientActorSystem)
- jobClient = Some(jc)
- jc
- }
- }
-
def getJobClientActorSystem: ActorSystem = jobClientActorSystem
def getJobManagerRPCPort: Int = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index f984ca9..b404aae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.jobmanager;
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -45,7 +43,6 @@ public class SlotCountExceedingParallelismTest {
private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
private static TestingCluster flink;
- private static ActorRef jobClient;
@BeforeClass
public static void setUp() throws Exception {
@@ -53,11 +50,6 @@ public class SlotCountExceedingParallelismTest {
NUMBER_OF_SLOTS_PER_TM,
NUMBER_OF_TMS,
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-
- jobClient = JobClient.createJobClientFromConfig(
- flink.configuration(),
- true,
- flink.jobManagerActorSystem());
}
@AfterClass
@@ -85,11 +77,7 @@ public class SlotCountExceedingParallelismTest {
// ---------------------------------------------------------------------------------------------
private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
- JobClient.submitJobAndWait(
- jobGraph,
- false,
- jobClient,
- TestingUtils.TESTING_DURATION());
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
}
private JobGraph createTestJobGraph(
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index ade14a1..08f8bfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,9 +18,8 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import akka.actor.ActorRef;
import com.google.common.collect.Lists;
-import org.apache.flink.runtime.client.JobClient;
+
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -46,7 +46,6 @@ public class ScheduleOrUpdateConsumersTest {
private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
private static TestingCluster flink;
- private static ActorRef jobClient;
@BeforeClass
public static void setUp() throws Exception {
@@ -54,11 +53,6 @@ public class ScheduleOrUpdateConsumersTest {
NUMBER_OF_SLOTS_PER_TM,
NUMBER_OF_TMS,
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-
- jobClient = JobClient.createJobClientFromConfig(
- flink.configuration(),
- true,
- flink.jobManagerActorSystem());
}
@AfterClass
@@ -122,7 +116,7 @@ public class ScheduleOrUpdateConsumersTest {
pipelinedReceiver,
blockingReceiver);
- JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION());
+ flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
}
// ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 28be994..07f0ce5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -69,7 +69,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with WrapA
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0fa3d5a..ee584f0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -71,7 +71,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
availableSlots should equal(1)
within(2 second) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
val success = expectMsgType[Success]
@@ -116,7 +116,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
availableSlots should equal(num_tasks)
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val result = expectMsgType[JobResultSuccess]
@@ -146,7 +146,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -181,7 +181,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -216,7 +216,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -253,7 +253,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -297,7 +297,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
@@ -341,7 +341,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -380,7 +380,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
}
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -428,7 +428,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
}
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -467,7 +467,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -509,7 +509,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -556,7 +556,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
val failure = expectMsgType[Failure]
@@ -595,7 +595,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try{
within(TestingUtils.TESTING_DURATION){
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 53bc70c..dfc650e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -78,7 +78,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION){
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -121,7 +121,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION){
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
@@ -165,7 +165,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION){
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index d719dc3..faff2f2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -65,7 +65,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
@@ -108,7 +108,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
val jm = cluster.getJobManager
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index dcfc899..39543f7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -69,7 +69,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
try{
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
@@ -118,7 +118,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
try{
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 85563e9..64b7bd8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -20,15 +20,12 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-
public class ClusterUtil {
private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
@@ -60,10 +57,8 @@ public class ClusterUtil {
try {
exec = new LocalFlinkMiniCluster(configuration, true);
- ActorRef jobClient = exec.getJobClient();
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());
+ SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true);
return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 4dac980..0ff5c56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -70,15 +70,12 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
executor = new ForkableFlinkMiniCluster(configuration);
}
try {
- ActorRef client = executor.getJobClient();
-
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
-
+
+ SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
latestResult = result.toJobExecutionResult(getClass().getClassLoader());
return latestResult;
}
- catch(JobExecutionException e) {
+ catch (JobExecutionException e) {
if (e.getMessage().contains("GraphConversionException")) {
throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 6acd5b0..9c6062e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.util;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.DataStatistics;
@@ -28,9 +26,9 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
+
import org.junit.Assert;
import org.junit.Test;
@@ -121,9 +119,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
- ActorRef client = this.executor.getJobClient();
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
+ SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
this.jobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index eaf9854..cf1caeb 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.util;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,9 +27,9 @@ import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
+
import org.junit.Assert;
public class TestEnvironment extends ExecutionEnvironment {
@@ -52,10 +51,8 @@ public class TestEnvironment extends ExecutionEnvironment {
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
-
- ActorRef client = this.executor.getJobClient();
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
+
+ SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
this.latestResult = result.toJobExecutionResult(getClass().getClassLoader());
return this.latestResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 345bffd..a3186a7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -19,17 +19,12 @@
package org.apache.flink.test.cancelling;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
@@ -44,11 +39,9 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileSystem;
+
import org.junit.After;
import org.junit.Before;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
/**
*
@@ -106,33 +99,46 @@ public abstract class CancellingTestBase {
runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
}
- public void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+ public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
try {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
- final ActorRef client = this.executor.getJobClient();
- final ActorSystem actorSystem = executor.getJobClientActorSystem();
+
+ final Thread currentThread = Thread.currentThread();
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
boolean jobSuccessfullyCancelled = false;
- Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait
- (jobGraph, false), new Timeout(AkkaUtils.getDefaultTimeout()));
-
- actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
- TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
- actorSystem.dispatcher(), ActorRef.noSender());
+ // trigger the cancelling asynchronous
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(msecsTillCanceling);
+ executor.getJobManager().tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()), ActorRef.noSender());
+ }
+ catch (Throwable t) {
+ error.set(t);
+ currentThread.interrupt();
+ }
+ }
+ }.run();
try {
- Await.result(result, AkkaUtils.getDefaultTimeout());
- } catch (JobCancellationException exception) {
+ executor.submitJobAndWait(jobGraph, false);
+ }
+ catch (JobCancellationException exception) {
jobSuccessfullyCancelled = true;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new IllegalStateException("Job failed.", e);
}
if (!jobSuccessfullyCancelled) {
throw new IllegalStateException("Job was not successfully cancelled.");
}
- }catch(Exception e){
+ }
+ catch(Exception e) {
LOG.error("Exception found in runAndCancelJob.", e);
Assert.fail(StringUtils.stringifyException(e));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index a54bd97..ca1c304 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -18,12 +18,9 @@
package org.apache.flink.test.failingPrograms;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -32,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -47,26 +45,44 @@ import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class JobSubmissionFailsITCase {
-
- private static ActorSystem system;
-
+
+ private static final int NUM_SLOTS = 20;
+
+ private static ForkableFlinkMiniCluster cluser;
private static JobGraph workingJobGraph;
@BeforeClass
public static void setup() {
- system = ActorSystem.create("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig());
-
- final AbstractJobVertex jobVertex = new AbstractJobVertex("Working job vertex.");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-
- workingJobGraph = new JobGraph("Working testing job", jobVertex);
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
+
+ cluser = new ForkableFlinkMiniCluster(config);
+
+ final AbstractJobVertex jobVertex = new AbstractJobVertex("Working job vertex.");
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ workingJobGraph = new JobGraph("Working testing job", jobVertex);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@AfterClass
public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
+ try {
+ cluser.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+
+ // --------------------------------------------------------------------------------------------
private boolean detached;
@@ -80,148 +96,101 @@ public class JobSubmissionFailsITCase {
new Boolean[]{true});
}
- private JobExecutionResult submitJob(JobGraph jobGraph, ActorRef jobClient) throws Exception {
+ // --------------------------------------------------------------------------------------------
+
+ private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
if (detached) {
- JobClient.submitJobDetached(jobGraph, jobClient, TestingUtils.TESTING_DURATION());
+ cluser.submitJobDetached(jobGraph);
return null;
}
else {
- SerializedJobExecutionResult result =
- JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION());
+ SerializedJobExecutionResult result = cluser.submitJobAndWait(
+ jobGraph, false, TestingUtils.TESTING_DURATION());
return result.toJobExecutionResult(getClass().getClassLoader());
}
}
@Test
public void testExceptionInInitializeOnMaster() {
- new JavaTestKit(system) {{
- final int numSlots = 20;
-
- final ForkableFlinkMiniCluster cluster =
- ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
- TestingUtils.TESTING_DURATION().toString());
-
- final ActorRef jobClient = cluster.getJobClient();
-
+ try {
final AbstractJobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
try {
- new Within(TestingUtils.TESTING_DURATION()) {
-
- @Override
- protected void run() {
- try {
- submitJob(failingJobGraph, jobClient);
- fail("Expected JobExecutionException.");
- } catch (JobExecutionException e) {
- assertEquals("Test exception.", e.getCause().getMessage());
- } catch (Throwable t) {
- fail("Caught wrong exception of type " + t.getClass() + ".");
- t.printStackTrace();
- }
-
- try {
- JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
- TestingUtils.TESTING_DURATION());
- } catch (Throwable t) {
- fail("Caught unexpected exception " + t.getMessage() + ".");
- }
- }
- };
- } finally {
- cluster.stop();
+ submitJob(failingJobGraph);
+ fail("Expected JobExecutionException.");
+ }
+ catch (JobExecutionException e) {
+ assertEquals("Test exception.", e.getCause().getMessage());
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Caught wrong exception of type " + t.getClass() + ".");
}
- }};
+
+ cluser.submitJobAndWait(workingJobGraph, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testSubmitEmptyJobGraph() {
- new JavaTestKit(system) {{
- final int numSlots = 20;
-
- final ForkableFlinkMiniCluster cluster =
- ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
- TestingUtils.TESTING_DURATION().toString());
-
- final ActorRef jobClient = cluster.getJobClient();
-
+ try {
final JobGraph jobGraph = new JobGraph("Testing job");
-
+
try {
- new Within(TestingUtils.TESTING_DURATION()) {
-
- @Override
- protected void run() {
- try {
- submitJob(jobGraph, jobClient);
- fail("Expected JobSubmissionException.");
- }
- catch (JobSubmissionException e) {
- assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- try {
- JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
- TestingUtils.TESTING_DURATION());
- } catch (Throwable t) {
- fail("Caught unexpected exception " + t.getMessage() + ".");
- }
- }
- };
- } finally {
- cluster.stop();
+ submitJob(jobGraph);
+ fail("Expected JobSubmissionException.");
+ }
+ catch (JobSubmissionException e) {
+ assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Caught wrong exception of type " + t.getClass() + ".");
}
- }};
+
+ cluser.submitJobAndWait(workingJobGraph, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
-
+
@Test
public void testSubmitNullJobGraph() {
- new JavaTestKit(system) {{
- final int numSlots = 20;
-
- final ForkableFlinkMiniCluster cluster =
- ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
- TestingUtils.TESTING_DURATION().toString());
-
- final ActorRef jobClient = cluster.getJobClient();
-
+ try {
try {
- new Within(TestingUtils.TESTING_DURATION()) {
-
- @Override
- protected void run() {
- try {
- submitJob(null, jobClient);
- fail("Expected JobSubmissionException.");
- } catch (JobSubmissionException e) {
- assertEquals("JobGraph must not be null.", e.getMessage());
- } catch (Throwable t) {
- fail("Caught wrong exception of type " + t.getClass() + ".");
- t.printStackTrace();
- }
-
- try {
- JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
- TestingUtils.TESTING_DURATION());
- } catch (Throwable t) {
- fail("Caught unexpected exception " + t.getMessage() + ".");
- }
- }
- };
- } finally {
- cluster.stop();
+ submitJob(null);
+ fail("Expected JobSubmissionException.");
+ }
+ catch (NullPointerException e) {
+ // yo!
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Caught wrong exception of type " + t.getClass() + ".");
}
- }};
+
+ cluser.submitJobAndWait(workingJobGraph, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+ // --------------------------------------------------------------------------------------------
+
public static class FailingJobVertex extends AbstractJobVertex {
+ private static final long serialVersionUID = -6365291240199412135L;
+
public FailingJobVertex(final String msg) {
super(msg);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index 19cf611..a739855 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -39,7 +39,6 @@ import org.apache.flink.util.Collector;
/**
* Tests whether the system recovers from a runtime exception from the user code.
*/
-@SuppressWarnings("deprecation")
public class TaskFailureITCase extends FailingTestBase {
private static final int parallelism = 4;
@@ -51,7 +50,8 @@ public class TaskFailureITCase extends FailingTestBase {
"1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
// expected result of working map job
- private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+ private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n" +
+ "3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
private String inputPath;
private String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 2901bf8..c102c1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -112,10 +112,10 @@ public abstract class AbstractProcessFailureRecoveryTest {
Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
Configuration jmConfig = new Configuration();
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1 s");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4 s");
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
- jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "2 s");
+ jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index e71e0bf..f3aa3e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -18,10 +18,8 @@
package org.apache.flink.test.util;
-import akka.actor.ActorRef;
import org.junit.Assert;
-import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -29,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
* Base class for integration tests which test whether the system recovers from failed executions.
*/
public abstract class FailingTestBase extends RecordAPITestBase {
+
/**
* Returns the {@link JobGraph} of the failing job.
*
@@ -116,13 +115,16 @@ public abstract class FailingTestBase extends RecordAPITestBase {
*/
private class SubmissionThread extends Thread {
- // reference to the timeout thread
+ /** reference to the timeout thread */
private final Thread timeoutThread;
- // cluster to submit the job to.
+
+ //**cluster to submit the job to. */
private final ForkableFlinkMiniCluster executor;
- // job graph of the failing job (submitted first)
+
+ /** job graph of the failing job (submitted first) */
private final JobGraph failingJob;
- // job graph of the working job (submitted after return from failing job)
+
+ /** job graph of the working job (submitted after return from failing job) */
private final JobGraph job;
private volatile Exception error;
@@ -142,24 +144,24 @@ public abstract class FailingTestBase extends RecordAPITestBase {
*/
@Override
public void run() {
- ActorRef client = this.executor.getJobClient();
-
try {
// submit failing job
- JobClient.submitJobAndWait(this.failingJob, false, client, executor.timeout());
-
+ this.executor.submitJobAndWait(this.failingJob, false);
this.error = new Exception("The job did not fail.");
- } catch(JobExecutionException jee) {
+ }
+ catch(JobExecutionException jee) {
// as expected
- } catch (Exception e) {
+ }
+ catch (Exception e) {
this.error = e;
}
try {
// submit working job
- JobClient.submitJobAndWait(this.job, false, client, executor.timeout());
- } catch (Exception e) {
+ this.executor.submitJobAndWait(this.job, false);
+ }
+ catch (Exception e) {
this.error = e;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 12e0e5b..3e17225 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -102,7 +102,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ jm ! SubmitJob(jobGraph, false)
expectMsg(Success(jobGraph.getJobID))
tm ! NotifyWhenJobManagerTerminated(jm)
@@ -117,7 +117,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
cluster.waitForTaskManagersToBeRegistered()
- jm ! SubmitJob(jobGraph2)
+ jm ! SubmitJob(jobGraph2, false)
val failure = expectMsgType[Success]