You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/06/08 14:45:12 UTC

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2083

    [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoints

    Disposing savepoints via the JobManager fails for state handles or descriptors, which contain user classes (for example custom folding state or RocksDB handles).
    
    With this change, the user has to provide the job ID of a running job when disposing a savepoint in order to use the user code class loader of that job or provide the job JARs.
    
    This version breaks the API as the CLI now requires either a JobID or a JAR. I think this is reasonable, because the current approach only works for a subset of the available state variants.
    
    We can port this back for 1.0.4 and make the JobID or JAR arguments optional. What do you think?
    
    I've tested this with a job running on RocksDB state both while the job was running and after it terminated. This was not working with the current 1.0.3 version.
    
    Ideally, we will get rid of the whole disposal business when we make savepoints properly self-contained. I'm going to open a JIRA issue with a proposal to do so soon.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3713-dispose_savepoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2083
    
----
commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-06-08T08:59:24Z

    [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoint
    
    Disposing savepoints via the JobManager fails for state handles or descriptors,
    which contain user classes (for example custom folding state or RocksDB handles).
    
    With this change, the user has to provide the job ID of a running job when disposing
    a savepoint in order to use the user code class loader of that job or provide the
    job JARs.
    
    This version breaks the API as the CLI now requires either a JobID or a JAR. I think
    this is reasonable, because the current approach only works for a subset of the
    available state variants.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68587921
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---
    @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
     		formatter.setSyntaxPrefix("  \"savepoint\" action options:");
     		formatter.printHelp(" ", getSavepointOptions(new Options()));
     		System.out.println();
    +		System.out.println("\n  Examples:");
    +		System.out.println("  - Trigger savepoint: bin/flink savepoint <Job ID>");
    +		System.out.println("  - Dispose savepoint:");
    +		System.out.println("    * For a running job: bin/flink savepoint -d <Path> <Job ID>");
    +		System.out.println("    * For a terminated job: bin/flink savepoint -d <Path> -j <Jar> [-c <mainClass> -C <classPath>]");
    --- End diff --
    
    True, that's inconsistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68580451
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " .");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
    +			PackagedProgram program = new PackagedProgram(
    --- End diff --
    
    I think we have to call `program.deleteExtractedLibraries()` at the end so that we clean up possibly extracted libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    Good work @uce :-)
    
    I agree that it currently it is a bit clumsy to discard savepoints of jobs which are no longer running. From the user perspective it should be as easy as possible. 
    
    I also think that it would be a good idea to add the RocksDB jar to the flink-dist.jar since all serious user are using it. Furthermore, I think it would be a good idea to not only log possible exceptions in `SubtaskState` but to communicate it back to the user. Thus, letting them bubble up and handling them on the `JobManager` level would be the right way to go.
    
    I had some minor comments concerning test coverage and the way the job jars are constructed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68571489
  
    --- Diff: docs/apis/cli.md ---
    @@ -279,13 +289,27 @@ guarantees for a stop request.
     
     Action "savepoint" triggers savepoints for a running job or disposes existing ones.
     
    -  Syntax: savepoint [OPTIONS] <Job ID>
    -  "savepoint" action options:
    -     -d,--dispose <savepointPath>   Disposes an existing savepoint.
    -     -m,--jobmanager <host:port>    Address of the JobManager (master) to which
    --- End diff --
    
    What happened to the jobmanager option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    It seems very clumsy to dispose savepoints like that. If that is needed for a fix currently, then I guess we have to live with that.
    
    Would make sense to make the jar and jobid optional, so that non-custom cases can dispose the savepoints in the simple way.
    Then, we could ad the RocksDB state backend to the jar in the flink distribution lib folder, so that it is always available and would not need jar or jobid for disposal. That would give the least user friction...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68588625
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " .");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
    +			PackagedProgram program = new PackagedProgram(
    --- End diff --
    
    Oh, good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    Full ack, but the problem is not just RocksDB, but also savepoints, which reference a user class in the state descriptor, including FS snapshots. So if you configure a file backend and use folding or reducing state, you run into the issue as well.
    
    What about the following: make it optional and fail with a proper error message in case of a missing user code class. SubtaskState (previously StateForTask), which fails to dispose the state snapshots currently only logs the Exception. We can change this to rethrow and then we give a proper error message on failed savepoint disposal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68579929
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    --- End diff --
    
    Have we actually tested somewhere that the jars we're uploading to the JobManager allow us to dispose the given savepoint with user code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    Thanks for review. I will propagate the errors, make the job ID/JAR arguments optional, and try to simplify parts of the CLI as you suggested.
    
    Regarding including the RocksDB jar in dist, I think that should be handled as a separate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2083


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    If there are no objections, I would like to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68590084
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " .");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
    +			PackagedProgram program = new PackagedProgram(
    +					new File(jarFile),
    +					options.getClasspaths(),
    +					options.getEntryPointClass(),
    +					options.getProgramArgs());
    +			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1);
    +
    +			JobGraph jobGraph;
    +			if (flinkPlan instanceof StreamingPlan) {
    +				jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
    +			} else {
    +				JobGraphGenerator gen = new JobGraphGenerator(this.config);
    +				jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan);
     			}
    -			catch (Exception e) {
    -				throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
    +
    +			for (URL jar : program.getAllLibraries()) {
    +				try {
    +					jobGraph.addJar(new Path(jar.toURI()));
    +				} catch (URISyntaxException e) {
    +					throw new RuntimeException("URL is invalid. This should not happen.", e);
    +				}
     			}
     
    +			jobGraph.setClasspaths(program.getClasspaths());
    +
    +			logAndSysout("Uploading JAR files for savepoint disposal.");
    +			JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout);
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepointWithClassLoader(
    +							savepointPath,
    +							jobGraph.getUserJarBlobKeys(),
    +							jobGraph.getClasspaths()),
    +					clientTimeout);
    --- End diff --
    
    Let me check if it is sufficient, but the `PackagedProgram.extractContainedLibaries` calls looks good. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68580226
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " .");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
    +			PackagedProgram program = new PackagedProgram(
    +					new File(jarFile),
    +					options.getClasspaths(),
    +					options.getEntryPointClass(),
    +					options.getProgramArgs());
    +			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1);
    +
    +			JobGraph jobGraph;
    +			if (flinkPlan instanceof StreamingPlan) {
    +				jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
    +			} else {
    +				JobGraphGenerator gen = new JobGraphGenerator(this.config);
    +				jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan);
     			}
    -			catch (Exception e) {
    -				throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
    +
    +			for (URL jar : program.getAllLibraries()) {
    +				try {
    +					jobGraph.addJar(new Path(jar.toURI()));
    +				} catch (URISyntaxException e) {
    +					throw new RuntimeException("URL is invalid. This should not happen.", e);
    +				}
     			}
     
    +			jobGraph.setClasspaths(program.getClasspaths());
    +
    +			logAndSysout("Uploading JAR files for savepoint disposal.");
    +			JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout);
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepointWithClassLoader(
    +							savepointPath,
    +							jobGraph.getUserJarBlobKeys(),
    +							jobGraph.getClasspaths()),
    +					clientTimeout);
    --- End diff --
    
    Do we really have to go through all this to extract the jar file paths? Wouldn't it be sufficient to do something like this:
    
    ```
    // Dispose with uploaded user code loader
    JobGraph jobGraph = new JobGraph("foobar");
    
    jobGraph.addJar(new Path(new File(jarFile).getAbsoluteFile().toURI()));
    
    // extract libraries
    List<File> libraries = PackagedProgram.extractContainedLibaries(new File(jarFile).getAbsoluteFile().toURI().toURL());
    
    for (File jar : libraries) {
    	jobGraph.addJar(new Path(jar.toURI()));
    }
    
    logAndSysout("Uploading JAR files for savepoint disposal.");
    JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout);
    
    response = jobManager.ask(
    		new JobManagerMessages.DisposeSavepointWithClassLoader(
    				savepointPath,
    				jobGraph.getUserJarBlobKeys(),
    				options.getClasspaths()),
    		clientTimeout);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68581644
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " .");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
    +			PackagedProgram program = new PackagedProgram(
    +					new File(jarFile),
    +					options.getClasspaths(),
    +					options.getEntryPointClass(),
    +					options.getProgramArgs());
    +			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1);
    +
    +			JobGraph jobGraph;
    +			if (flinkPlan instanceof StreamingPlan) {
    +				jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
    +			} else {
    +				JobGraphGenerator gen = new JobGraphGenerator(this.config);
    +				jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan);
     			}
    -			catch (Exception e) {
    -				throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
    +
    +			for (URL jar : program.getAllLibraries()) {
    +				try {
    +					jobGraph.addJar(new Path(jar.toURI()));
    +				} catch (URISyntaxException e) {
    +					throw new RuntimeException("URL is invalid. This should not happen.", e);
    +				}
     			}
     
    +			jobGraph.setClasspaths(program.getClasspaths());
    +
    +			logAndSysout("Uploading JAR files for savepoint disposal.");
    +			JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout);
    --- End diff --
    
    Maybe we could also refactor the current code so that the `JobGraph` no longer does the uploading of the jars to the blob server. Then we wouldn't have to construct a `JobGraph` at all. I also think that it shouldn't be the responsibility of the `JobGraph` to upload the user code jars but simply to store the file names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68572931
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---
    @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
     		formatter.setSyntaxPrefix("  \"savepoint\" action options:");
     		formatter.printHelp(" ", getSavepointOptions(new Options()));
     		System.out.println();
    +		System.out.println("\n  Examples:");
    +		System.out.println("  - Trigger savepoint: bin/flink savepoint <Job ID>");
    +		System.out.println("  - Dispose savepoint:");
    +		System.out.println("    * For a running job: bin/flink savepoint -d <Path> <Job ID>");
    +		System.out.println("    * For a terminated job: bin/flink savepoint -d <Path> -j <Jar> [-c <mainClass> -C <classPath>]");
    --- End diff --
    
    Is it consistent that we specify the job id without an option whereas we specify the jar with `-j`? Shouldn't only those parameters which are used everywhere be used without an option flag?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68588080
  
    --- Diff: docs/apis/cli.md ---
    @@ -279,13 +289,27 @@ guarantees for a stop request.
     
     Action "savepoint" triggers savepoints for a running job or disposes existing ones.
     
    -  Syntax: savepoint [OPTIONS] <Job ID>
    -  "savepoint" action options:
    -     -d,--dispose <savepointPath>   Disposes an existing savepoint.
    -     -m,--jobmanager <host:port>    Address of the JobManager (master) to which
    --- End diff --
    
    I think it should be still there but let me check again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2083: [FLINK-3713] [clients, runtime] Use user code class loade...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2083
  
    I've addressed the comments:
    - Moved the JAR uploading to a helper:
    
        ```java
    // Retrieve blob server address and upload JARs to server
    uploadJarFiles(ActorGateway, FiniteDuration, List<Path>);
    // Upload JARs to server 
    uploadJarFiles(InetSocketAddress, List<Path>);
        ```
        I did not remove the `JobGraph` upload JARs method, but it now calls these helpers. I decided against removing it, because it's called in multiple places with the same pattern (upload, get BLOB keys, set BLOB keys).
    - I extract the libraries as you suggested (no need for the job graph now)
    - Removed the job ID variant as it was overloading disposal too much and just stuck to the JAR variant, which is now optional (no script API break)
    - State disposal errors are now propagated and the a failed disposal with `ClassNotFoundException` gives a hint to provide the JAR file
    - I've added an automated test with custom KV state in `ClassLoaderITCase`
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---