You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2016/07/29 16:51:13 UTC

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/2313

    [FLINK-4273] Modify JobClient to attach to running jobs

    These changes are required for FLINK-4272 (introduce a JobClient class
    for job control). Essentially, we want to be able to re-attach to a
    running job and monitor it. It shouldn't make any difference whether we
    just submitted the job or we recover it from an existing JobID.
    
    This PR modifies the JobClientActor to support two different operation
    modes: a) submitJob and monitor b) re-attach to job and monitor
    
    The JobClient class has been updated with methods to access this
    functionality. Before it just had `submitJobAndWait` and
    `submitJobDetachd`. Additionally, it has `submitJob` and
    `attachToRunningJob` and `awaitJobResult`.
    
    `submitJob` -> Submit job and return a future which can be completed to
    get the result with `awaitJobResult`
    
    `attachToRunningJob` -> Re-attached the JobClientActor to a runnning
    job, registering at the JobManager and downloading its class loader
    
    `awaitJobResult` -> Blocks until the returned future from either
    `submitJob` or `attachToRunningJob` has been completed
    
    TODO
    - missing integration test to test downloading of the user code
    class loader from the JobManager and to end-to-end test the
    re-attachment.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink FLINK-4273

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2313
    
----
commit 7832810728abea5824e2fe0d9e9dc75b14ef61af
Author: Maximilian Michels <mx...@apache.org>
Date:   2016-07-29T15:37:41Z

    [FLINK-4273] Modify JobClient to attach to running jobs
    
    These changes are required for FLINK-4272 (introduce a JobClient class
    for job control). Essentially, we want to be able to re-attach to a
    running job and monitor it. It shouldn't make any difference whether we
    just submitted the job or we recover it from an existing JobID.
    
    This PR modifies the JobClientActor to support two different operation
    modes: a) submitJob and monitor b) re-attach to job and monitor
    
    The JobClient class has been updated with methods to access this
    functionality. Before it just had `submitJobAndWait` and
    `submitJobDetachd`. Additionally, it has `submitJob` and
    `attachToRunningJob` and `awaitJobResult`.
    
    `submitJob` -> Submit job and return a future which can be completed to
    get the result with `awaitJobResult`
    
    `attachToRunningJob` -> Re-attached the JobClientActor to a runnning
    job, registering at the JobManager and downloading its class loader
    
    `awaitJobResult` -> Blocks until the returned future from either
    `submitJob` or `attachToRunningJob` has been completed
    
    TODO
    - missing integration test to test downloading of the user code
    class loader from the JobManager and to end-to-end test the
    re-attachment.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75908291
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult awaitJobResult(JobListeningContext listeningCon
     		final JobID jobID = listeningContext.getJobID();
     		final ActorRef jobClientActor = listeningContext.getJobClientActor();
     		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
    +		final FiniteDuration askTimeout = listeningContext.getTimeout();
     		// retrieves class loader if necessary
     		final ClassLoader classLoader = listeningContext.getClassLoader();
     
    +		// wait for the future which holds the result to be ready
    +		// ping the JobClientActor from time to time to check if it is still running
     		while (!jobSubmissionFuture.isCompleted()) {
     			try {
    -				Thread.sleep(250);
    -			} catch (InterruptedException e) {
    -				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    -			}
    -
    -			try {
    -				Await.result(
    -					Patterns.ask(
    -						jobClientActor,
    -						JobClientMessages.getPing(),
    -						Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -					AkkaUtils.getDefaultTimeout());
    +				Await.ready(jobSubmissionFuture, askTimeout);
     			} catch (Exception e) {
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75645279
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    Can't we wait on the `jobSubmissionFuture` for, e.g., Flink's ask timeout and recover timeout exceptions by sending an `Identify` message to the JobClient actor. If he replies in time, then re-wait for the `jobSubmissionFuture`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r74756158
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    --- End diff --
    
    Is there a reason not to use the default "akka.ask.timeout" here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75460084
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    Yes I think that would be good. The user code class loader should always be retrievable if the job is still running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    @tillrohrmann I've refactored the JobClientActor to include the common code in `JobClientActorBase` and have implementations for submitting/attaching in `JobSubmissionClientActor` and `JobAttachmentClientActor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75644122
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    +			} catch (InterruptedException e) {
    +				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    +			}
    +
    +			try {
    +				Await.result(
    +					Patterns.ask(
    +						jobClientActor,
    +						JobClientMessages.getPing(),
    --- End diff --
    
    I think we can also use Akka's build-in message `Identify` to do the same. Then we don't have to introduce a new message type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75905168
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult awaitJobResult(JobListeningContext listeningCon
     		final JobID jobID = listeningContext.getJobID();
     		final ActorRef jobClientActor = listeningContext.getJobClientActor();
     		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
    +		final FiniteDuration askTimeout = listeningContext.getTimeout();
     		// retrieves class loader if necessary
     		final ClassLoader classLoader = listeningContext.getClassLoader();
     
    +		// wait for the future which holds the result to be ready
    +		// ping the JobClientActor from time to time to check if it is still running
     		while (!jobSubmissionFuture.isCompleted()) {
     			try {
    -				Thread.sleep(250);
    -			} catch (InterruptedException e) {
    -				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    -			}
    -
    -			try {
    -				Await.result(
    -					Patterns.ask(
    -						jobClientActor,
    -						JobClientMessages.getPing(),
    -						Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -					AkkaUtils.getDefaultTimeout());
    +				Await.ready(jobSubmissionFuture, askTimeout);
     			} catch (Exception e) {
    --- End diff --
    
    But at least an `IllegalArgumentException` should not trigger the pinging of the job client actor. This should be handled differently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75482760
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    In addition to the result, you'll also need the class loader for getting accumulators of a running job. 
    
    I agree that it would be nice to fail when the class loader can't be reconstructed, but *only* if it is really the only option. So we could start off with the class loader set to `None` in the `JobListeningContext`. When the class loader is needed, i.e. accumulator retrieval or job execution result retrieval, it is fetched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75302372
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +				}
    +			}
    +
    +			for (URL url : requiredClasspaths) {
    +				allURLs[pos++] = url;
    +			}
    +
    +			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +		} else {
    +			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +		}
    +	}
    +
    +	/**
    +	 * Given a JobListeningContext, awaits the result of the job execution that this context is bound to
    +	 * @param listeningContext The listening context of the job execution
    +	 * @return The result of the execution
    +	 * @throws JobExecutionException if anything goes wrong while monitoring the job
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
    +
    +		final JobID jobID = listeningContext.jobID;
    +		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
    +		final ClassLoader classLoader = listeningContext.classLoader;
    +
     		// first block handles errors while waiting for the result
    -		Object answer;
    +		final Object answer;
     		try {
    -			Future<Object> future = Patterns.ask(jobClientActor,
    -					new JobClientMessages.SubmitJobAndWait(jobGraph),
    -					new Timeout(AkkaUtils.INF_TIMEOUT()));
    -			
    -			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
    +			answer = Await.result(jobSubmissionFuture, AkkaUtils.INF_TIMEOUT());
    --- End diff --
    
    What if the `JobClientActor` dies for some reason? Then he won't be able to complete or fail the future and we would be stuck. Maybe we could periodically check if the `JobClientActor` is still alive to avoid this scenario.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r74757794
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
     					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
     			}
     		}
    +		else if (message instanceof AttachToJobAndWait) {
    --- End diff --
    
    This branch looks like sharing a lot of code with the branch above. I wonder if there's a way to generalize the code into a method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75443084
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    --- End diff --
    
    There is a `ShutdownHook` in the BlobCache which does that. Additionally, we could pass the `BlobService` into the `SubmissionContext` and add a `cleanup()` method there which clears the jars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75122721
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		BlobCache blobClient = null;
    +		try {
    +			final Object jmAnswer;
    +			try {
    +				jmAnswer = Await.result(
    +					jobManager.ask(
    +						new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +			} catch (Exception e) {
    +				throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +			}
    +
    +			if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +				JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +				Option<String> jmHost = jobManager.actor().path().address().host();
    +				String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +				InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +				blobClient = new BlobCache(serverAddress, config);
    +
    +				final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +				final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +				final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +				int pos = 0;
    +				for (BlobKey blobKey : props.requiredJarFiles()) {
    +					try {
    +						allURLs[pos++] = blobClient.getURL(blobKey);
    +					} catch (Exception e) {
    +						throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +					}
    +				}
    +
    +				for (URL url : requiredClasspaths) {
    +					allURLs[pos++] = url;
    +				}
    +
    +				return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +			} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +				throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +			} else {
    +				throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +			}
    +		} finally {
    +			if (blobClient != null) {
    +				blobClient.shutdown();
    --- End diff --
    
    Yes thank you! This should only be called if the downloading fails. Otherwise, we use the blobclient's shutdown hook.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75474811
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    Actually, I'm not really sure about this corner case. We don't typically retry client side operations in case the leader has changed after retrieving it. Instead, we just throw an error (see all the methods in `ClusterClient`). The `JobClientActor` is exceptional in this regard and it has to be because it operates independently of the user function.
    
    So we could fail if we can't reconstruct the class loader. That of course has the caveat that even if the user doesn't use custom classes for the JobExecutionResult or Exceptions, the job retrieval may fail (e.g. firewall blocking the blobManager port). That's why I didn't want to enforce this step but we could enforce it and fix eventual problems with the BlobManager communication if there are any.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75305767
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---
    @@ -58,10 +62,63 @@ class JobInfo(
         }
       }
     
    -  override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
    +
    +  /**
    +    * Notifies all clients by sending a message
    +    * @param message the message to send
    +    */
    +  def notifyClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Notifies all clients which are not of type detached
    +    * @param message the message to sent to non-detached clients
    +    */
    +  def notifyNonDetachedClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, ListeningBehaviour.DETACHED) =>
    +        // do nothing
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Sends a message to job clients that match the listening behavior
    +    * @param message the message to send to all clients
    +    * @param listeningBehaviour the desired listening behaviour
    +    */
    +  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = {
    +    clients foreach {
    +      case (clientActor, `listeningBehaviour`) =>
    +        clientActor ! message
    +      case _ =>
    +    }
    +  }
     
       def setLastActive() =
         lastActive = System.currentTimeMillis()
    +
    +
    +  override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)"
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case that: JobInfo =>
    +      this.isInstanceOf[JobInfo] &&
    --- End diff --
    
    Why do we need this check here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Updated according to our comment discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75307414
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---
    @@ -58,10 +62,63 @@ class JobInfo(
         }
       }
     
    -  override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
    +
    +  /**
    +    * Notifies all clients by sending a message
    +    * @param message the message to send
    +    */
    +  def notifyClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Notifies all clients which are not of type detached
    +    * @param message the message to sent to non-detached clients
    +    */
    +  def notifyNonDetachedClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, ListeningBehaviour.DETACHED) =>
    +        // do nothing
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Sends a message to job clients that match the listening behavior
    +    * @param message the message to send to all clients
    +    * @param listeningBehaviour the desired listening behaviour
    +    */
    +  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = {
    +    clients foreach {
    +      case (clientActor, `listeningBehaviour`) =>
    +        clientActor ! message
    +      case _ =>
    +    }
    +  }
     
       def setLastActive() =
         lastActive = System.currentTimeMillis()
    +
    +
    +  override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)"
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case that: JobInfo =>
    +      this.isInstanceOf[JobInfo] &&
    --- End diff --
    
    Yes, that's unnecessary because of the case type check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    I did a quick pass over the code. I think this change needs another review by our Actor expert @tillrohrmann ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Merging this if there are no further comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    I've made the last changes concerning the lazy reconstruction of the class loader we discussed. Rebased to master. Should be good to go now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75310003
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    --- End diff --
    
    Does it make sense to clean up this `BlobCache` once the job execution result has been delivered?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Thanks for helpful review @tillrohrmann and @rmetzger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    CC @rmetzger @tillrohrmann Could you please take a look? I would like to merge this. Tests are passing: https://travis-ci.org/mxm/flink/builds/151653198


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75669417
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    Or something like an immutable future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75650523
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    +			} catch (InterruptedException e) {
    +				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    +			}
    +
    +			try {
    +				Await.result(
    +					Patterns.ask(
    +						jobClientActor,
    +						JobClientMessages.getPing(),
    --- End diff --
    
    Good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Good work @mxm. I made some minor comments inline.
    
    Just for my own clarification: Is it still planned to have a new kind of `JobClient` which is bound to a specific job and which can be used to issue job specific calls such as `cancel`, `stop`, execution result retrieval, etc. I thought that the `ClusterClient` is used to communicate with the cluster, whereas the `JobClient` is responsible for the job communication. Will this be a follow-up?
    
    The test case `JobClientActorTest. testConnectionTimeoutAfterJobRegistration` is failing on Travis.
    
    After addressing the comments +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75301960
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    What if the `JobManager` has already changed at this point? We would no longer be able to retrieve the ClassLoader, wouldn't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75307091
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---
    @@ -100,13 +101,51 @@ public void testSubmissionTimeout() throws Exception {
     		Await.result(jobExecutionResult, timeout);
     	}
     
    +
    +	/** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration
    +	 * cannot be performd at the JobManager by the JobClientActor. This is here the case, because the
    +	 * started JobManager never replies to a {@link RegisterJobClient} message.
    +	 */
    +	@Test(expected=JobClientActorRegistrationTimeoutException.class)
    +	public void testRegistrationTimeout() throws Exception {
    +		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
    +		FiniteDuration timeout = jobClientActorTimeout.$times(2);
    +
    +		UUID leaderSessionID = UUID.randomUUID();
    +
    +		ActorRef jobManager = system.actorOf(
    +			Props.create(
    +				PlainActor.class,
    +				leaderSessionID));
    +
    +		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
    +			jobManager.path().toString(),
    +			leaderSessionID
    +		);
    +
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			testingLeaderRetrievalService,
    +			jobClientActorTimeout,
    +			false);
    +
    +		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
    +
    +
    --- End diff --
    
    Two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75442830
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
     					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
     			}
     		}
    +		else if (message instanceof AttachToJobAndWait) {
    --- End diff --
    
    I agree that this design could be improved. I'll consider factoring out common code into a base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75902171
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult awaitJobResult(JobListeningContext listeningCon
     		final JobID jobID = listeningContext.getJobID();
     		final ActorRef jobClientActor = listeningContext.getJobClientActor();
     		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
    +		final FiniteDuration askTimeout = listeningContext.getTimeout();
     		// retrieves class loader if necessary
     		final ClassLoader classLoader = listeningContext.getClassLoader();
     
    +		// wait for the future which holds the result to be ready
    +		// ping the JobClientActor from time to time to check if it is still running
     		while (!jobSubmissionFuture.isCompleted()) {
     			try {
    -				Thread.sleep(250);
    -			} catch (InterruptedException e) {
    -				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    -			}
    -
    -			try {
    -				Await.result(
    -					Patterns.ask(
    -						jobClientActor,
    -						JobClientMessages.getPing(),
    -						Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -					AkkaUtils.getDefaultTimeout());
    +				Await.ready(jobSubmissionFuture, askTimeout);
     			} catch (Exception e) {
    --- End diff --
    
    We throw the exception anyways afterwards. The only difference is that we wrap the exception and throw only if the future has not been completed in the meantime.
    
    We would have to catch `InterruptedException`, `TimeoutException`, and `IllegalArgumentException`. I'm not convinced this is necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75468671
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +				}
    +			}
    +
    +			for (URL url : requiredClasspaths) {
    +				allURLs[pos++] = url;
    +			}
    +
    +			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +		} else {
    +			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +		}
    +	}
    +
    +	/**
    +	 * Given a JobListeningContext, awaits the result of the job execution that this context is bound to
    +	 * @param listeningContext The listening context of the job execution
    +	 * @return The result of the execution
    +	 * @throws JobExecutionException if anything goes wrong while monitoring the job
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
    +
    +		final JobID jobID = listeningContext.jobID;
    +		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
    +		final ClassLoader classLoader = listeningContext.classLoader;
    +
     		// first block handles errors while waiting for the result
    -		Object answer;
    +		final Object answer;
     		try {
    -			Future<Object> future = Patterns.ask(jobClientActor,
    -					new JobClientMessages.SubmitJobAndWait(jobGraph),
    -					new Timeout(AkkaUtils.INF_TIMEOUT()));
    -			
    -			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
    +			answer = Await.result(jobSubmissionFuture, AkkaUtils.INF_TIMEOUT());
    --- End diff --
    
    Okay, will ping the actor periodically to check if it is alive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75123576
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    --- End diff --
    
    That's not possible because the `JobClientActor` will complete this future with the result of the job execution which may be be infinitely delayed. In all other cases (i.e. timeout to register at jobmanager, failure to attach to job, failure to submit job), the `JobClientActor` will complete the future with a failure message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Rebased to the changes on master. Merging after tests pass again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75907814
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult awaitJobResult(JobListeningContext listeningCon
     		final JobID jobID = listeningContext.getJobID();
     		final ActorRef jobClientActor = listeningContext.getJobClientActor();
     		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
    +		final FiniteDuration askTimeout = listeningContext.getTimeout();
     		// retrieves class loader if necessary
     		final ClassLoader classLoader = listeningContext.getClassLoader();
     
    +		// wait for the future which holds the result to be ready
    +		// ping the JobClientActor from time to time to check if it is still running
     		while (!jobSubmissionFuture.isCompleted()) {
     			try {
    -				Thread.sleep(250);
    -			} catch (InterruptedException e) {
    -				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    -			}
    -
    -			try {
    -				Await.result(
    -					Patterns.ask(
    -						jobClientActor,
    -						JobClientMessages.getPing(),
    -						Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -					AkkaUtils.getDefaultTimeout());
    +				Await.ready(jobSubmissionFuture, askTimeout);
     			} catch (Exception e) {
    --- End diff --
    
    Ah, I thought you were commenting on the inner catch block. Yes, it makes sense to catch only `TimeoutException` and `InterruptedException` here for `Await.ready`. `Await.result` actually throws Exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75459375
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    True, this code assumes that the JobManager doesn't change between retrieving the leading jobmanager and retrieving the class loader. There is always some possible gap where the jobmanager could change. We could mitigate this by retrying in case is has changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75676249
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    My bad, the future was only completed due to some faulty testing code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    @tillrohrmann Pinging the actor now to check if it is still alive. Also added another test case for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75442774
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +				}
    +			}
    +
    +			for (URL url : requiredClasspaths) {
    +				allURLs[pos++] = url;
    +			}
    +
    +			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +		} else {
    +			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +		}
    +	}
    +
    +	/**
    +	 * Given a JobListeningContext, awaits the result of the job execution that this context is bound to
    +	 * @param listeningContext The listening context of the job execution
    +	 * @return The result of the execution
    +	 * @throws JobExecutionException if anything goes wrong while monitoring the job
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
    +
    +		final JobID jobID = listeningContext.jobID;
    +		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
    +		final ClassLoader classLoader = listeningContext.classLoader;
    +
     		// first block handles errors while waiting for the result
    -		Object answer;
    +		final Object answer;
     		try {
    -			Future<Object> future = Patterns.ask(jobClientActor,
    -					new JobClientMessages.SubmitJobAndWait(jobGraph),
    -					new Timeout(AkkaUtils.INF_TIMEOUT()));
    -			
    -			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
    +			answer = Await.result(jobSubmissionFuture, AkkaUtils.INF_TIMEOUT());
    --- End diff --
    
    That's an issue that was present before in `JobClientActor`. Couldn't we ensure with Akka that the actor sends a message before it dies?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r72887224
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---
    @@ -37,8 +37,10 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
      * @param start Starting time
      */
     class JobInfo(
    -  val client: ActorRef,
    -  val listeningBehaviour: ListeningBehaviour,
    +
    +  var client: ActorRef,
    +  var listeningBehaviour: ListeningBehaviour,
    --- End diff --
    
    We might want to allow multiple clients here. Otherwise only the least recent registered client will receive updates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75644258
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    We do sleep here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75483059
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    Yes that could be a good solution :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75650460
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    Sure, we can do that. We will have longer intervals between the checks then but that is probably fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75308861
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.clients.examples;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.testkit.JavaTestKit;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.client.program.ClusterClient;
    +import org.apache.flink.client.program.StandaloneClusterClient;
    +import org.apache.flink.runtime.client.JobRetrievalException;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import scala.collection.Seq;
    +
    +import java.util.concurrent.Semaphore;
    +
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.fail;
    +
    +
    +/**
    + * Tests retrieval of a job from a running Flink cluster
    + */
    +public class JobRetrievalITCase {
    --- End diff --
    
    `extends TestLogger` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75305261
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
     					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
     			}
     		}
    +		else if (message instanceof AttachToJobAndWait) {
    --- End diff --
    
    At the moment the `JobClientActor` can be used to attach to a job and submit a new job at the same time. Then depending on the order in which the jobs terminate either the submitted job's or the attached job's result is reported back but the other future will never be completed. It might make sense to guard against this or to have two different JobClientActors based on the same base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75476846
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    Yeah that's the question: Shall we fail or try to perform on a best effort basis. If you have user code classes in your result, then the deserialization will fail later on, right? In this case, it would be better imo that the user tries  the operation again because the failure might have been caused by a leader change. On the other hand you might only be interested in the cancel, stop job commands and are not interested in the deserialized result.
    
    Would it be possible that we first connect to the `JobManager` and only if we want to wait for the job result we try to reconstruct the classloader? If that fails, then we throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75452006
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +				}
    +			}
    +
    +			for (URL url : requiredClasspaths) {
    +				allURLs[pos++] = url;
    +			}
    +
    +			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +		} else {
    +			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +		}
    +	}
    +
    +	/**
    +	 * Given a JobListeningContext, awaits the result of the job execution that this context is bound to
    +	 * @param listeningContext The listening context of the job execution
    +	 * @return The result of the execution
    +	 * @throws JobExecutionException if anything goes wrong while monitoring the job
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
    +
    +		final JobID jobID = listeningContext.jobID;
    +		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
    +		final ClassLoader classLoader = listeningContext.classLoader;
    +
     		// first block handles errors while waiting for the result
    -		Object answer;
    +		final Object answer;
     		try {
    -			Future<Object> future = Patterns.ask(jobClientActor,
    -					new JobClientMessages.SubmitJobAndWait(jobGraph),
    -					new Timeout(AkkaUtils.INF_TIMEOUT()));
    -			
    -			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
    +			answer = Await.result(jobSubmissionFuture, AkkaUtils.INF_TIMEOUT());
    --- End diff --
    
    I'm not sure whether this can always be guaranteed. But you could periodically check whether the actor is still alive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75900649
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult awaitJobResult(JobListeningContext listeningCon
     		final JobID jobID = listeningContext.getJobID();
     		final ActorRef jobClientActor = listeningContext.getJobClientActor();
     		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
    +		final FiniteDuration askTimeout = listeningContext.getTimeout();
     		// retrieves class loader if necessary
     		final ClassLoader classLoader = listeningContext.getClassLoader();
     
    +		// wait for the future which holds the result to be ready
    +		// ping the JobClientActor from time to time to check if it is still running
     		while (!jobSubmissionFuture.isCompleted()) {
     			try {
    -				Thread.sleep(250);
    -			} catch (InterruptedException e) {
    -				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    -			}
    -
    -			try {
    -				Await.result(
    -					Patterns.ask(
    -						jobClientActor,
    -						JobClientMessages.getPing(),
    -						Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -					AkkaUtils.getDefaultTimeout());
    +				Await.ready(jobSubmissionFuture, askTimeout);
     			} catch (Exception e) {
    --- End diff --
    
    I think we can narrow down this exception here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75650141
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    +			} catch (InterruptedException e) {
    +				throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e);
    +			}
    +
    +			try {
    +				Await.result(
    +					Patterns.ask(
    +						jobClientActor,
    +						JobClientMessages.getPing(),
    --- End diff --
    
    Sure, we can do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2313
  
    Thanks for the review @tillrohrmann. Yes, the plan is to have a `JobClient` API class (the existing JobClient class will be renamed) which uses the SubmissionContext to supervise submitted jobs or attach to existing jobs. All the job-related methods from `ClusterClient` will be moved to this new class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2313


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75442857
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.clients.examples;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.testkit.JavaTestKit;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.client.program.ClusterClient;
    +import org.apache.flink.client.program.StandaloneClusterClient;
    +import org.apache.flink.runtime.client.JobRetrievalException;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import scala.collection.Seq;
    +
    +import java.util.concurrent.Semaphore;
    +
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.fail;
    +
    +
    +/**
    + * Tests retrieval of a job from a running Flink cluster
    + */
    +public class JobRetrievalITCase {
    --- End diff --
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75669289
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
     	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
     
     		final JobID jobID = listeningContext.jobID;
    +		final ActorRef jobClientActor = listeningContext.jobClientActor;
     		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
     		final ClassLoader classLoader = listeningContext.classLoader;
     
    +		while (!jobSubmissionFuture.isCompleted()) {
    +			try {
    +				Thread.sleep(250);
    --- End diff --
    
    Actually, it's not quite as easy. If we simply wait on the ask timeout via `Await.ready/result`, then we complete the `jobSubmissionFuture` with a timeout (if the job runs longer then the timeout interval). Subsequent checks will always return the same result.
    
    Thus, we probably need something like a sleep here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r74757666
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		BlobCache blobClient = null;
    +		try {
    +			final Object jmAnswer;
    +			try {
    +				jmAnswer = Await.result(
    +					jobManager.ask(
    +						new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +			} catch (Exception e) {
    +				throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +			}
    +
    +			if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +				JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +				Option<String> jmHost = jobManager.actor().path().address().host();
    +				String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +				InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +				blobClient = new BlobCache(serverAddress, config);
    +
    +				final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +				final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +				final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +				int pos = 0;
    +				for (BlobKey blobKey : props.requiredJarFiles()) {
    +					try {
    +						allURLs[pos++] = blobClient.getURL(blobKey);
    +					} catch (Exception e) {
    +						throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +					}
    +				}
    +
    +				for (URL url : requiredClasspaths) {
    +					allURLs[pos++] = url;
    +				}
    +
    +				return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +			} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +				throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    +			} else {
    +				throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +			}
    +		} finally {
    +			if (blobClient != null) {
    +				blobClient.shutdown();
    --- End diff --
    
    Isn't this call deleting all the blobs we've downloaded again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---