You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/02/16 14:17:51 UTC
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
GitHub user GJL opened a pull request:
https://github.com/apache/flink/pull/5509
[FLINK-7715][flip6] Port JarRunHandler to new REST endpoint
## What is the purpose of the change
*Port JarRunHandler to new REST endpoint.*
## Brief change log
- *Add JarRunHandler.*
## Verifying this change
This change added tests and can be verified as follows:
- *Manually submitted a job.*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/GJL/flink FLINK-7715
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5509.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5509
----
commit e7d389e6522ffb011afc497db430f37b2acc7098
Author: gyao <ga...@...>
Date: 2018-02-14T19:47:11Z
[FLINK-7711][flip6] Implement JarListHandler
This closes #5209.
This closes #5455.
commit 62ad9be6fe3995d8c23614960307b045d361962c
Author: Till Rohrmann <tr...@...>
Date: 2018-02-13T14:33:11Z
[FLINK-8647] [flip6] Introduce JobMasterConfiguration
This commit introduces a JobMasterConfiguration which contains JobMaster specific
configuration settings.
This closes #5478.
commit a049b2387ca4934503bb77566407c06945a0d670
Author: Till Rohrmann <tr...@...>
Date: 2018-02-15T10:05:35Z
[FLINK-8610] [flip6] Remove RestfulGateway from JobMasterGateway
The JobMaster no longer needs to implement the RestfulGateway. Therefore,
it is removed by this commit.
This closes #5433.
commit abc95cb6a24a5cd631cc6bad86d11788cd09a58b
Author: Till Rohrmann <tr...@...>
Date: 2018-02-07T13:12:54Z
[hotfix] Fix checkstyle violations in JobExecutionResult
commit 9cfbe5798459ad7b43224e35b0ed57ae93613efe
Author: Till Rohrmann <tr...@...>
Date: 2018-02-15T10:08:04Z
[FLINK-8611] [flip6] Add result future to JobManagerRunner
This commit adds a CompletableFuture<ArchivedExecutionGraph> to the
JobManagerRunner. This future will be completed once the job has
reached a globally terminal state.
This closes #5434.
commit c08acd6fa6484e2aec4a74dd104c7cda7eda05c9
Author: Till Rohrmann <tr...@...>
Date: 2018-02-15T10:16:12Z
[FLINK-8612] [flip6] Enable non-detached job mode
The non-detached job mode waits until has served the JobResult of
a completed job at least once before it terminates.
This closes #5435.
commit d0ee3936279653eff9d06fe7a7746be1e5cc6bc3
Author: zjureel <zj...@...>
Date: 2017-12-19T09:07:56Z
[FLINK-7857][flip6] Port JobVertexDetailsHandler to REST endpoint
commit 8559d496c09a4ba050fb86155332f72b35df6935
Author: gyao <ga...@...>
Date: 2018-02-15T10:32:15Z
[FLINK-7857][flip6] Return status 404 if JobVertex is unknown
This closes #5493.
This closes #5035.
commit d2344c98759d6616455833cb3c1083fec12fc56a
Author: Till Rohrmann <tr...@...>
Date: 2018-02-15T10:37:58Z
[FLINK-8662] [tests] Harden FutureUtilsTest#testRetryWithDelay
This commit moves the start of the time measurement before the triggering of
the retry with delay operation.
This closes #5494.
commit 9a00b3f20a904b5e93b9c48ea11950f715a530f4
Author: Till Rohrmann <tr...@...>
Date: 2018-02-07T17:21:06Z
[hotfix] [yarn] Write number of slots to configuration
commit cc91b6ac0f67093beb9c17c6fa1beb16a0380b3b
Author: Till Rohrmann <tr...@...>
Date: 2018-02-07T17:58:32Z
[hotfix] [yarn] Remove unnecessary TaskManager configuration generation
commit 9c820e91730b517ca660275c80370714f4729e36
Author: Till Rohrmann <tr...@...>
Date: 2018-02-08T09:27:27Z
[hotfix] Only log retrying exception on debug in RetryingRegistration
commit 633c78699083a21ca0578d5da34042f8c3368292
Author: Till Rohrmann <tr...@...>
Date: 2018-01-30T08:22:03Z
[FLINK-8614] [flip6] Activate Flip-6 mode per default
This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).
This closes #5437.
commit 5b7c5415ff2b942f23c477d0ff10c33269530e63
Author: gyao <ga...@...>
Date: 2018-02-16T14:00:23Z
[hotfix] Remove unused code in JarActionHandler
commit d070ed478263afbab5e43c99d3d7c1d61f618145
Author: gyao <ga...@...>
Date: 2018-02-16T14:15:19Z
[FLINK-7715][flip6] Implement JarRunHandler
----
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5509
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168768261
--- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JarRunHandler}.
+ */
+public class JarRunHandlerTest {
--- End diff --
Maybe add tests for business logic.
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168768024
--- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -1126,7 +1126,7 @@ public static void setJobManagerAddressInConfig(Configuration config, InetSocket
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
- private static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
+ public static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
--- End diff --
Maybe move method to utils class.
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168922506
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+ AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+
+ private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+ private final Path jarDir;
+
+ private final Configuration configuration;
+
+ private final Executor executor;
+
+ private final RestClusterClient<String> restClusterClient;
+
+ public JarRunHandler(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ final Time timeout,
+ final Map<String, String> responseHeaders,
+ final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+ final Path jarDir,
+ final Configuration configuration,
+ final Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ this.jarDir = requireNonNull(jarDir);
+ this.configuration = requireNonNull(configuration);
+ this.executor = requireNonNull(executor);
+ try {
+ this.restClusterClient = new RestClusterClient<>(configuration, "");
--- End diff --
I think we should move the `RestClusterClient` creation out of the handler into the `DispatcherEndpoint`. That way we can also enforce a proper shut down of it.
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168929157
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+ AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+
+ private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+ private final Path jarDir;
+
+ private final Configuration configuration;
+
+ private final Executor executor;
+
+ private final RestClusterClient<String> restClusterClient;
+
+ public JarRunHandler(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ final Time timeout,
+ final Map<String, String> responseHeaders,
+ final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+ final Path jarDir,
+ final Configuration configuration,
+ final Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ this.jarDir = requireNonNull(jarDir);
+ this.configuration = requireNonNull(configuration);
+ this.executor = requireNonNull(executor);
+ try {
+ this.restClusterClient = new RestClusterClient<>(configuration, "");
--- End diff --
Hmm I just realized that this is not so easy since we would need `flink-clients` as a dependency where the `JarRunHandler` is initialized...
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168922479
--- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -1126,7 +1126,7 @@ public static void setJobManagerAddressInConfig(Configuration config, InetSocket
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
- private static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
+ public static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
--- End diff --
You're right, we should move it to a utility class.
---
[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5509#discussion_r168930002
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+ AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+
+ private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+ private final Path jarDir;
+
+ private final Configuration configuration;
+
+ private final Executor executor;
+
+ private final RestClusterClient<String> restClusterClient;
+
+ public JarRunHandler(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ final Time timeout,
+ final Map<String, String> responseHeaders,
+ final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+ final Path jarDir,
+ final Configuration configuration,
+ final Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ this.jarDir = requireNonNull(jarDir);
+ this.configuration = requireNonNull(configuration);
+ this.executor = requireNonNull(executor);
+ try {
+ this.restClusterClient = new RestClusterClient<>(configuration, "");
--- End diff --
What do you think about a `DispatcherRestEndpoint` subclass which lives in `flink-runtime-web` and adds the web submission handler? This class would then be instantiated via the `WebMonitorUtils`.
---