You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 07:07:04 UTC
[5/5] flink git commit: [FLINK-4935] [webfrontend] Submit job with
savepoint
[FLINK-4935] [webfrontend] Submit job with savepoint
This closes #2714.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6c88cec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6c88cec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6c88cec
Branch: refs/heads/master
Commit: c6c88cec4fcce2e8fcee2a9cfc14d7857d6b6b06
Parents: c0e620f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Oct 27 16:26:36 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 08:06:30 2016 +0100
----------------------------------------------------------------------
docs/monitoring/rest_api.md | 36 +++
.../webmonitor/handlers/JarActionHandler.java | 221 +++++++++++------
.../webmonitor/handlers/JarPlanHandler.java | 3 +-
.../webmonitor/handlers/JarRunHandler.java | 3 +-
.../handlers/JarActionHandlerTest.java | 92 +++++++
.../web-dashboard/app/partials/submit.jade | 7 +
.../web-dashboard/app/scripts/index.coffee | 2 +-
.../scripts/modules/submit/submit.ctrl.coffee | 6 +-
.../scripts/modules/submit/submit.svc.coffee | 8 +-
.../web-dashboard/web/css/vendor.css | 2 +-
flink-runtime-web/web-dashboard/web/js/index.js | 242 ++++++++++---------
.../web-dashboard/web/js/vendor.js | 2 +-
.../web-dashboard/web/partials/submit.html | 12 +-
.../jobgraph/SavepointRestoreSettings.java | 17 ++
14 files changed, 449 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/docs/monitoring/rest_api.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index e84e2cc..13ba85a 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -74,6 +74,11 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
- `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>`
- `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators`
- `/jobs/<jobid>/plan`
+ - `/jars/upload`
+ - `/jars`
+ - `/jars/:jarid`
+ - `/jars/:jarid/plan`
+ - `/jars/:jarid/run`
### General
@@ -652,3 +657,34 @@ The `savepointPath` points to the external path of the savepoint, which can be u
"cause": "<error message>"
}
~~~
+
+### Submitting Programs
+
+It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
+
+#### Run a Program (POST)
+
+Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `jobmanager.web.upload.dir`).
+
+You can specify the following query parameters (all optional):
+
+- **Program arguments**: `program-args=arg1 arg2 arg3`
+- **Main class to execute**: `entry-class=EntryClassName.class`
+- **Default parallelism**: `parallelism=4`
+- **Savepoint path to restore from**: `savepointPath=hdfs://path/to/savepoint`
+- **Allow non restored state**: `allowNonRestoredState=true`
+
+If the call succeeds, you will get a response with the ID of the submitted job.
+
+**Example:** Run program with a savepoint
+
+Request:
+~~~
+POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true
+~~~
+
+Response:
+~~~
+{"jobid": "869a9868d49c679e7355700e0857af85"}
+~~~
+
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 1e23f1f..5abe117 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
@@ -34,6 +35,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ExceptionUtils;
import java.io.File;
@@ -56,43 +58,18 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
jarDir = jarDirectory;
}
- protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(Map<String, String> pathParams, Map<String, String> queryParams) throws Exception {
+ protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(JarActionHandlerConfig config) throws Exception {
// generate the graph
JobGraph graph = null;
- final String file = pathParams.get("jarid");
- if (file == null) {
- throw new IllegalArgumentException("No jarid was provided.");
- }
-
- final List<String> programArgs;
- // parse required params
- String param = queryParams.get("program-args");
- programArgs = (param != null && !param.equals("")) ? tokenizeArguments(param) : new ArrayList<String>();
-
- final String entryClassOpt = queryParams.get("entry-class");
- final String parallelismOpt = queryParams.get("parallelism");
-
- int parallelism = 1;
- String entryClass = null;
-
- if (parallelismOpt != null && !parallelismOpt.equals("")) {
- parallelism = Integer.parseInt(parallelismOpt);
- if (parallelism < 1) {
- throw new IllegalArgumentException("Parallelism must be a positive number.");
- }
- }
-
- // get entry class
- if (entryClassOpt != null && !entryClassOpt.equals("")) {
- entryClass = entryClassOpt;
- }
- PackagedProgram program = new PackagedProgram(new File(jarDir, file), entryClass,
- programArgs.toArray(new String[programArgs.size()]));
+ PackagedProgram program = new PackagedProgram(
+ new File(jarDir, config.getJarFile()),
+ config.getEntryClass(),
+ config.getProgramArgs());
ClassLoader classLoader = program.getUserCodeClassLoader();
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
- FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, program, parallelism);
+ FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, program, config.getParallelism());
if (plan instanceof StreamingPlan) {
graph = ((StreamingPlan) plan).getJobGraph();
@@ -102,6 +79,10 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
if (graph == null) {
throw new CompilerException("A valid job graph couldn't be generated for the jar.");
}
+
+ // Set the savepoint settings
+ graph.setSavepointRestoreSettings(config.getSavepointRestoreSettings());
+
for (URL jar : program.getAllLibraries()) {
try {
graph.addJar(new Path(jar.toURI()));
@@ -113,49 +94,6 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
return Tuple2.of(graph, classLoader);
}
- /**
- * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
- * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
- * takes care of quotes, such that quoted passages end up being one string.
- *
- * @param args
- * The string to be split.
- * @return The array of split strings.
- */
- private static List<String> tokenizeArguments(String args) {
- List<String> list = new ArrayList<String>();
- StringBuilder curr = new StringBuilder();
-
- int pos = 0;
- boolean quoted = false;
-
- while (pos < args.length()) {
- char c = args.charAt(pos);
- if ((c == ' ' || c == '\t') && !quoted) {
- if (curr.length() > 0) {
- list.add(curr.toString());
- curr.setLength(0);
- }
- } else if (c == '"') {
- quoted = !quoted;
- } else {
- curr.append(c);
- }
-
- pos++;
- }
-
- if (quoted) {
- throw new IllegalArgumentException("Unterminated quoted string.");
- }
-
- if (curr.length() > 0) {
- list.add(curr.toString());
- }
-
- return list;
- }
-
protected String sendError(Exception e) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -167,4 +105,139 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
return writer.toString();
}
+
+ /**
+ * Wrapper for all configuration that is parsed from query and path args.
+ */
+ @VisibleForTesting
+ static class JarActionHandlerConfig {
+
+ private final String jarFile;
+ private final String[] programArgs;
+ private final String entryClass;
+ private final int parallelism;
+ private final SavepointRestoreSettings savepointRestoreSettings;
+
+ JarActionHandlerConfig(
+ String jarFile,
+ String[] programArgs,
+ String entryClass,
+ int parallelism,
+ SavepointRestoreSettings savepointRestoreSettings) {
+
+ this.jarFile = jarFile;
+ this.programArgs = programArgs;
+ this.entryClass = entryClass;
+ this.parallelism = parallelism;
+ this.savepointRestoreSettings = savepointRestoreSettings;
+ }
+
+ public String getJarFile() {
+ return jarFile;
+ }
+
+ public String[] getProgramArgs() {
+ return programArgs;
+ }
+
+ public String getEntryClass() {
+ return entryClass;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public SavepointRestoreSettings getSavepointRestoreSettings() {
+ return savepointRestoreSettings;
+ }
+
+ public static JarActionHandlerConfig fromParams(Map<String, String> pathParams, Map<String, String> queryParams) {
+ // Jar file
+ String jarFile = pathParams.get("jarid");
+ if (jarFile == null) {
+ throw new IllegalArgumentException("No jarid was provided.");
+ }
+
+ // Program args
+ String[] programArgs = new String[0];
+ String programArgsOpt = queryParams.get("program-args");
+ if (programArgsOpt!= null && !programArgsOpt.equals("")) {
+ List<String> args = tokenizeArguments(programArgsOpt);
+ programArgs = args.toArray(new String[args.size()]);
+ }
+
+ // Entry class
+ String entryClass = null;
+ String entryClassOpt = queryParams.get("entry-class");
+ if (entryClassOpt != null && !entryClassOpt.equals("")) {
+ entryClass = entryClassOpt;
+ }
+
+ // Parallelism
+ int parallelism = 1;
+ String parallelismOpt = queryParams.get("parallelism");
+ if (parallelismOpt != null && !parallelismOpt.equals("")) {
+ parallelism = Integer.parseInt(parallelismOpt);
+ if (parallelism < 1) {
+ throw new IllegalArgumentException("Parallelism must be a positive number.");
+ }
+ }
+
+ // Savepoint restore settings
+ SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
+ String savepointPath = queryParams.get("savepointPath");
+ if (savepointPath != null && !savepointPath.equals("")) {
+ String allowNonRestoredOpt = queryParams.get("allowNonRestoredState");
+ boolean allowNonRestoredState = allowNonRestoredOpt != null && allowNonRestoredOpt.equals("true");
+ savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+ }
+
+ return new JarActionHandlerConfig(jarFile, programArgs, entryClass, parallelism, savepointSettings);
+ }
+
+ /**
+ * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+ * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+ * takes care of quotes, such that quoted passages end up being one string.
+ *
+ * @param args
+ * The string to be split.
+ * @return The array of split strings.
+ */
+ private static List<String> tokenizeArguments(String args) {
+ List<String> list = new ArrayList<String>();
+ StringBuilder curr = new StringBuilder();
+
+ int pos = 0;
+ boolean quoted = false;
+
+ while (pos < args.length()) {
+ char c = args.charAt(pos);
+ if ((c == ' ' || c == '\t') && !quoted) {
+ if (curr.length() > 0) {
+ list.add(curr.toString());
+ curr.setLength(0);
+ }
+ } else if (c == '"') {
+ quoted = !quoted;
+ } else {
+ curr.append(c);
+ }
+
+ pos++;
+ }
+
+ if (quoted) {
+ throw new IllegalArgumentException("Unterminated quoted string.");
+ }
+
+ if (curr.length() > 0) {
+ list.add(curr.toString());
+ }
+
+ return list;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 3a95d6a..bd0a6af 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -39,7 +39,8 @@ public class JarPlanHandler extends JarActionHandler {
@Override
public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
- JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ JobGraph graph = getJobGraphAndClassLoader(config).f0;
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 8d3e57f..474be33 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -50,7 +50,8 @@ public class JarRunHandler extends JarActionHandler {
@Override
public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
- Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
try {
graph.f0.uploadUserJars(jobManager, timeout, clientConfig);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
new file mode 100644
index 0000000..fbac126
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.JarActionHandlerConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JarActionHandlerTest {
+
+ /**
+ * Test that the savepoint settings are correctly parsed.
+ */
+ @Test
+ public void testSavepointRestoreSettings() throws Exception {
+ Map<String, String> pathParams = new HashMap<>();
+ pathParams.put("jarid", "required"); // required
+
+ // the following should be ignored, because they are parsed from the query params
+ pathParams.put("savepointPath", "ignored");
+ pathParams.put("allowNonRestoredState", "ignored");
+
+ Map<String, String> queryParams = new HashMap<>(); // <-- everything goes here
+
+ // Nothing configured
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings());
+
+ // Set path
+ queryParams.put("savepointPath", "the-savepoint-path");
+ queryParams.put("allowNonRestoredState", "");
+
+ SavepointRestoreSettings expected = SavepointRestoreSettings.forPath("the-savepoint-path", false);
+
+ config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ assertEquals(expected, config.getSavepointRestoreSettings());
+
+ // Set flag
+ queryParams.put("allowNonRestoredState", "true");
+
+ expected = SavepointRestoreSettings.forPath("the-savepoint-path", true);
+ config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ assertEquals(expected, config.getSavepointRestoreSettings());
+ }
+
+ /**
+ * Tests that empty String params are handled ignored.
+ */
+ @Test
+ public void testEmptyStringParams() throws Exception {
+ Map<String, String> pathParams = new HashMap<>();
+ pathParams.put("jarid", "required"); // required
+ Map<String, String> queryParams = new HashMap<>();
+
+ queryParams.put("program-args", "");
+ queryParams.put("entry-class", "");
+ queryParams.put("parallelism", "");
+ queryParams.put("savepointPath", "");
+ queryParams.put("allowNonRestoredState", "");
+
+ // Nothing configured
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+
+ assertEquals(0, config.getProgramArgs().length);
+ assertNull(config.getEntryClass());
+ assertEquals(1, config.getParallelism());
+ assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/partials/submit.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/submit.jade b/flink-runtime-web/web-dashboard/app/partials/submit.jade
index 45547a0..03e10fa 100644
--- a/flink-runtime-web/web-dashboard/app/partials/submit.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/submit.jade
@@ -85,6 +85,13 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
| {{state['submit-button']}}
|
i.fa.fa-spin.fa-spinner(ng-if="state['submit-button'] == 'Submitting'")
+ tr
+ td
+ input.form-control(type="text" placeholder="Savepoint Path" title="Savepoint Path" ng-model="state['savepointPath']")
+ td
+ label.checkbox-inline
+ input.checkbox-inline(type="checkbox", ng-model="state['allowNonRestoredState']")
+ | Allow Non Restored State
table.table.table-no-border(ng-if="jid")
tbody
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index aeb77bc..179e172 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
.value 'flinkConfig', {
jobServer: ''
-# jobServer: 'http://localhost:8081/'
+# jobServer: 'http://localhost:8081/'
"refresh-interval": 10000
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index be495e4..0cd95a5 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -32,6 +32,8 @@ angular.module('flinkApp')
$scope.state = {
selected: null,
parallelism: "",
+ savepointPath: "",
+ allowNonRestoredState: false
'entry-class': "",
'program-args': "",
'plan-button': "Show Plan",
@@ -100,7 +102,9 @@ angular.module('flinkApp')
$scope.state.selected, {
'entry-class': $scope.state['entry-class'],
parallelism: $scope.state.parallelism,
- 'program-args': $scope.state['program-args']
+ 'program-args': $scope.state['program-args'],
+ savepointPath: $scope.state['savepointPath'],
+ allowNonRestoredState: $scope.state['allowNonRestoredState']
}
).then (data) ->
if action == $scope.state['action-time']
http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
index 0f2cb96..f4b4aa6 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
@@ -23,7 +23,7 @@ angular.module('flinkApp')
@loadJarList = () ->
deferred = $q.defer()
- $http.get("jars/")
+ $http.get(flinkConfig.jobServer + "jars/")
.success (data, status, headers, config) ->
deferred.resolve(data)
@@ -32,7 +32,7 @@ angular.module('flinkApp')
@deleteJar = (id) ->
deferred = $q.defer()
- $http.delete("jars/" + encodeURIComponent(id))
+ $http.delete(flinkConfig.jobServer + "jars/" + encodeURIComponent(id))
.success (data, status, headers, config) ->
deferred.resolve(data)
@@ -41,7 +41,7 @@ angular.module('flinkApp')
@getPlan = (id, args) ->
deferred = $q.defer()
- $http.get("jars/" + encodeURIComponent(id) + "/plan", {params: args})
+ $http.get(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/plan", {params: args})
.success (data, status, headers, config) ->
deferred.resolve(data)
@@ -50,7 +50,7 @@ angular.module('flinkApp')
@runJob = (id, args) ->
deferred = $q.defer()
- $http.post("jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
+ $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
.success (data, status, headers, config) ->
deferred.resolve(data)