You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/04/24 09:41:41 UTC
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/5903
[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC
## What is the purpose of the change
This PR reworks the `JarRunHandler` to submit the job to the dispatcher via RPC, instead of setting up a `RestClusterClient` and going through the client's job-submission routine.
The reasoning is that the existing behavior was causing issues on kubernetes, and this change also removes a special-case as this was the only handler that actively sends out rest requests.
## Brief change log
* `JarRunHandler` now has access to `DispatcherGateway`
* `JarRunHandler` now uploads jar and submits job via RPC
## Verifying this change
* run `JarRunHandlerTest`
* submit job through webUI
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 9211
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5903.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5903
----
commit e662220a50ed2f430cfc15082af11ab24f233bf9
Author: zentol <ch...@...>
Date: 2018-04-23T10:35:51Z
[FLINK-9211][REST] JarRunHandler submits job to Dispatcher via RPC
commit 51772685541adb185ffebcc5800d4eb6e60d35d3
Author: zentol <ch...@...>
Date: 2018-04-24T09:34:11Z
add test
----
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5903
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5903#discussion_r184974645
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -105,9 +110,32 @@ public JarRunHandler(
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
--- End diff --
gotcha. I think that's way off topic, so this is still good to merge
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5903#discussion_r184950890
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -105,9 +110,32 @@ public JarRunHandler(
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
--- End diff --
enabling queued scheduling
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5903#discussion_r184930213
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -105,9 +110,32 @@ public JarRunHandler(
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
--- End diff --
Is this also what the thing sitting behind the REST endpoint does?
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5903#discussion_r184953698
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -105,9 +110,32 @@ public JarRunHandler(
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
--- End diff --
If by `the thing sitting behind the REST endpoint` you mean the `Dispatcher`, then no, it does not enable queued scheduling, but effectively relies on it being enabled. If you don't enable it beforehand in the client/handler the job submission will outright fail with an entirely unhelpful error message.
The `Dispatcher` should _probably_ enable it though, but that isn't really in the scope of this PR as it would also affect the `RestClusterClient` which does the same thing.
Technically it may also be questionably as it would invalidate part of the `JobGraph` API.
---
[GitHub] flink issue #5903: [FLINK-9211][REST] JarRunHandler submits job to Dispatche...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5903
merging.
---
[GitHub] flink pull request #5903: [FLINK-9211][REST] JarRunHandler submits job to Di...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5903#discussion_r184936545
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -105,9 +110,32 @@ public JarRunHandler(
savepointRestoreSettings,
parallelism);
- return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
- .submitJob(jobGraph)
- .thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+ final List<PermanentBlobKey> keys;
+ try {
+ keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+
+ for (PermanentBlobKey key : keys) {
+ jobGraph.addBlob(key);
+ }
+
+ return jobGraph;
+ });
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
+ // we have to enable queued scheduling because slots will be allocated lazily
--- End diff --
what do you mean?
---