You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/04/24 09:41:41 UTC

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5903

    [FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC

    ## What is the purpose of the change
    
    This PR reworks the `JarRunHandler` to submit the job to the dispatcher via RPC, instead of setting up a `RestClusterClient` and going through the client's job-submission routine.
    
    The reasoning is that the existing behavior was causing issues on kubernetes, and this change also removes a special-case as this was the only handler that actively sends out rest requests.
    
    ## Brief change log
    
    * `JarRunHandler` now has access to `DispatcherGateway`
    * `JarRunHandler` now uploads jar and submits job via RPC
    
    ## Verifying this change
    
    * run `JarRunHandlerTest`
    *  submit job through webUI
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9211

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5903.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5903
    
----
commit e662220a50ed2f430cfc15082af11ab24f233bf9
Author: zentol <ch...@...>
Date:   2018-04-23T10:35:51Z

    [FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC

commit 51772685541adb185ffebcc5800d4eb6e60d35d3
Author: zentol <ch...@...>
Date:   2018-04-24T09:34:11Z

    add test

----


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5903


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5903#discussion_r184974645
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -105,9 +110,32 @@ public JarRunHandler(
     			savepointRestoreSettings,
     			parallelism);
     
    -		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
    -			.submitJob(jobGraph)
    -			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
    +			final List<PermanentBlobKey> keys;
    +			try {
    +				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
    +			} catch (IOException ioe) {
    +				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
    +			}
    +
    +			for (PermanentBlobKey key : keys) {
    +				jobGraph.addBlob(key);
    +			}
    +
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
    +			// we have to enable queued scheduling because slots will be allocated lazily
    --- End diff --
    
    gotcha. I think that's way off topic, so this is still good to merge


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5903#discussion_r184950890
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -105,9 +110,32 @@ public JarRunHandler(
     			savepointRestoreSettings,
     			parallelism);
     
    -		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
    -			.submitJob(jobGraph)
    -			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
    +			final List<PermanentBlobKey> keys;
    +			try {
    +				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
    +			} catch (IOException ioe) {
    +				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
    +			}
    +
    +			for (PermanentBlobKey key : keys) {
    +				jobGraph.addBlob(key);
    +			}
    +
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
    +			// we have to enable queued scheduling because slots will be allocated lazily
    --- End diff --
    
    enabling queued scheduling


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5903#discussion_r184930213
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -105,9 +110,32 @@ public JarRunHandler(
     			savepointRestoreSettings,
     			parallelism);
     
    -		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
    -			.submitJob(jobGraph)
    -			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
    +			final List<PermanentBlobKey> keys;
    +			try {
    +				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
    +			} catch (IOException ioe) {
    +				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
    +			}
    +
    +			for (PermanentBlobKey key : keys) {
    +				jobGraph.addBlob(key);
    +			}
    +
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
    +			// we have to enable queued scheduling because slots will be allocated lazily
    --- End diff --
    
    Is this also what the thing sitting behind the REST endpoint does?


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5903#discussion_r184953698
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -105,9 +110,32 @@ public JarRunHandler(
     			savepointRestoreSettings,
     			parallelism);
     
    -		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
    -			.submitJob(jobGraph)
    -			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
    +			final List<PermanentBlobKey> keys;
    +			try {
    +				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
    +			} catch (IOException ioe) {
    +				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
    +			}
    +
    +			for (PermanentBlobKey key : keys) {
    +				jobGraph.addBlob(key);
    +			}
    +
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
    +			// we have to enable queued scheduling because slots will be allocated lazily
    --- End diff --
    
    If by `the thing sitting behind the REST endpoint` you mean the `Dispatcher`, then no, it does not enable queued scheduling, but effectively relies on it being enabled. If you don't enable it beforehand in the client/handler the job submission will outright fail with an entirely unhelpful error message.
    
    The `Dispatcher` should _probably_ enable it though, but that isn't really in the scope of this PR as it would also affect the `RestClusterClient` which does the same thing.
    Technically it may also be questionably as it would invalidate part of the `JobGraph` API.


---

[GitHub] flink issue #5903: [FLINK-9211][REST] JarRunHandler submits job to Dispatche...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5903
  
    merging.


---

[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5903#discussion_r184936545
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -105,9 +110,32 @@ public JarRunHandler(
     			savepointRestoreSettings,
     			parallelism);
     
    -		return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
    -			.submitJob(jobGraph)
    -			.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
    +			final List<PermanentBlobKey> keys;
    +			try {
    +				keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
    +			} catch (IOException ioe) {
    +				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
    +			}
    +
    +			for (PermanentBlobKey key : keys) {
    +				jobGraph.addBlob(key);
    +			}
    +
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
    +			// we have to enable queued scheduling because slots will be allocated lazily
    --- End diff --
    
    what do you mean?


---