You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 07:07:04 UTC

[5/5] flink git commit: [FLINK-4935] [webfrontend] Submit job with savepoint

[FLINK-4935] [webfrontend] Submit job with savepoint

This closes #2714.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6c88cec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6c88cec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6c88cec

Branch: refs/heads/master
Commit: c6c88cec4fcce2e8fcee2a9cfc14d7857d6b6b06
Parents: c0e620f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Oct 27 16:26:36 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 08:06:30 2016 +0100

----------------------------------------------------------------------
 docs/monitoring/rest_api.md                     |  36 +++
 .../webmonitor/handlers/JarActionHandler.java   | 221 +++++++++++------
 .../webmonitor/handlers/JarPlanHandler.java     |   3 +-
 .../webmonitor/handlers/JarRunHandler.java      |   3 +-
 .../handlers/JarActionHandlerTest.java          |  92 +++++++
 .../web-dashboard/app/partials/submit.jade      |   7 +
 .../web-dashboard/app/scripts/index.coffee      |   2 +-
 .../scripts/modules/submit/submit.ctrl.coffee   |   6 +-
 .../scripts/modules/submit/submit.svc.coffee    |   8 +-
 .../web-dashboard/web/css/vendor.css            |   2 +-
 flink-runtime-web/web-dashboard/web/js/index.js | 242 ++++++++++---------
 .../web-dashboard/web/js/vendor.js              |   2 +-
 .../web-dashboard/web/partials/submit.html      |  12 +-
 .../jobgraph/SavepointRestoreSettings.java      |  17 ++
 14 files changed, 449 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/docs/monitoring/rest_api.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index e84e2cc..13ba85a 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -74,6 +74,11 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
   - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>`
   - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators`
   - `/jobs/<jobid>/plan`
+  - `/jars/upload`
+  - `/jars`
+  - `/jars/:jarid`
+  - `/jars/:jarid/plan`
+  - `/jars/:jarid/run`
 
 
 ### General
@@ -652,3 +657,34 @@ The `savepointPath` points to the external path of the savepoint, which can be u
   "cause": "<error message>"
 }
 ~~~
+
+### Submitting Programs
+
+It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
+
+#### Run a Program (POST)
+
+Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `jobmanager.web.upload.dir`).
+
+You can specify the following query parameters (all optional):
+
+- **Program arguments**: `program-args=arg1 arg2 arg3`
+- **Main class to execute**: `entry-class=EntryClassName.class`
+- **Default parallelism**: `parallelism=4`
+- **Savepoint path to restore from**: `savepointPath=hdfs://path/to/savepoint`
+- **Allow non restored state**:  `allowNonRestoredState=true`
+
+If the call succeeds, you will get a response with the ID of the submitted job.
+
+**Example:** Run program with a savepoint
+
+Request:
+~~~
+POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true
+~~~
+
+Response:
+~~~
+{"jobid": "869a9868d49c679e7355700e0857af85"}
+~~~
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 1e23f1f..5abe117 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -34,6 +35,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.File;
@@ -56,43 +58,18 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 		jarDir = jarDirectory;
 	}
 
-	protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(Map<String, String> pathParams, Map<String, String> queryParams) throws Exception {
+	protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(JarActionHandlerConfig config) throws Exception {
 		// generate the graph
 		JobGraph graph = null;
-		final String file = pathParams.get("jarid");
-		if (file == null) {
-			throw new IllegalArgumentException("No jarid was provided.");
-		}
-
-		final List<String> programArgs;
-		// parse required params
-		String param = queryParams.get("program-args");
-		programArgs = (param != null && !param.equals("")) ? tokenizeArguments(param) : new ArrayList<String>();
-
-		final String entryClassOpt = queryParams.get("entry-class");
-		final String parallelismOpt = queryParams.get("parallelism");
-
-		int parallelism = 1;
-		String entryClass = null;
-
-		if (parallelismOpt != null && !parallelismOpt.equals("")) {
-			parallelism = Integer.parseInt(parallelismOpt);
-			if (parallelism < 1) {
-				throw new IllegalArgumentException("Parallelism must be a positive number.");
-			}
-		}
-
-		// get entry class
-		if (entryClassOpt != null && !entryClassOpt.equals("")) {
-			entryClass = entryClassOpt;
-		}
 
-		PackagedProgram program = new PackagedProgram(new File(jarDir, file), entryClass,
-				programArgs.toArray(new String[programArgs.size()]));
+		PackagedProgram program = new PackagedProgram(
+				new File(jarDir, config.getJarFile()),
+				config.getEntryClass(),
+				config.getProgramArgs());
 		ClassLoader classLoader = program.getUserCodeClassLoader();
 
 		Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
-		FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, program, parallelism);
+		FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, program, config.getParallelism());
 
 		if (plan instanceof StreamingPlan) {
 			graph = ((StreamingPlan) plan).getJobGraph();
@@ -102,6 +79,10 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 		if (graph == null) {
 			throw new CompilerException("A valid job graph couldn't be generated for the jar.");
 		}
+
+		// Set the savepoint settings
+		graph.setSavepointRestoreSettings(config.getSavepointRestoreSettings());
+
 		for (URL jar : program.getAllLibraries()) {
 			try {
 				graph.addJar(new Path(jar.toURI()));
@@ -113,49 +94,6 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 		return Tuple2.of(graph, classLoader);
 	}
 
-	/**
-	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
-	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
-	 * takes care of quotes, such that quoted passages end up being one string.
-	 *
-	 * @param args
-	 *        The string to be split.
-	 * @return The array of split strings.
-	 */
-	private static List<String> tokenizeArguments(String args) {
-		List<String> list = new ArrayList<String>();
-		StringBuilder curr = new StringBuilder();
-
-		int pos = 0;
-		boolean quoted = false;
-
-		while (pos < args.length()) {
-			char c = args.charAt(pos);
-			if ((c == ' ' || c == '\t') && !quoted) {
-				if (curr.length() > 0) {
-					list.add(curr.toString());
-					curr.setLength(0);
-				}
-			} else if (c == '"') {
-				quoted = !quoted;
-			} else {
-				curr.append(c);
-			}
-
-			pos++;
-		}
-
-		if (quoted) {
-			throw new IllegalArgumentException("Unterminated quoted string.");
-		}
-
-		if (curr.length() > 0) {
-			list.add(curr.toString());
-		}
-
-		return list;
-	}
-
 	protected String sendError(Exception e) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -167,4 +105,139 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 		
 		return writer.toString();
 	}
+
+	/**
+	 * Wrapper for all configuration that is parsed from query and path args.
+	 */
+	@VisibleForTesting
+	static class JarActionHandlerConfig {
+
+		private final String jarFile;
+		private final String[] programArgs;
+		private final String entryClass;
+		private final int parallelism;
+		private final SavepointRestoreSettings savepointRestoreSettings;
+
+		JarActionHandlerConfig(
+				String jarFile,
+				String[] programArgs,
+				String entryClass,
+				int parallelism,
+				SavepointRestoreSettings savepointRestoreSettings) {
+
+			this.jarFile = jarFile;
+			this.programArgs = programArgs;
+			this.entryClass = entryClass;
+			this.parallelism = parallelism;
+			this.savepointRestoreSettings = savepointRestoreSettings;
+		}
+
+		public String getJarFile() {
+			return jarFile;
+		}
+
+		public String[] getProgramArgs() {
+			return programArgs;
+		}
+
+		public String getEntryClass() {
+			return entryClass;
+		}
+
+		public int getParallelism() {
+			return parallelism;
+		}
+
+		public SavepointRestoreSettings getSavepointRestoreSettings() {
+			return savepointRestoreSettings;
+		}
+
+		public static JarActionHandlerConfig fromParams(Map<String, String> pathParams, Map<String, String> queryParams) {
+			// Jar file
+			String jarFile = pathParams.get("jarid");
+			if (jarFile == null) {
+				throw new IllegalArgumentException("No jarid was provided.");
+			}
+
+			// Program args
+			String[] programArgs = new String[0];
+			String programArgsOpt = queryParams.get("program-args");
+			if (programArgsOpt!= null && !programArgsOpt.equals("")) {
+				List<String> args = tokenizeArguments(programArgsOpt);
+				programArgs = args.toArray(new String[args.size()]);
+			}
+
+			// Entry class
+			String entryClass = null;
+			String entryClassOpt = queryParams.get("entry-class");
+			if (entryClassOpt != null && !entryClassOpt.equals("")) {
+				entryClass = entryClassOpt;
+			}
+
+			// Parallelism
+			int parallelism = 1;
+			String parallelismOpt = queryParams.get("parallelism");
+			if (parallelismOpt != null && !parallelismOpt.equals("")) {
+				parallelism = Integer.parseInt(parallelismOpt);
+				if (parallelism < 1) {
+					throw new IllegalArgumentException("Parallelism must be a positive number.");
+				}
+			}
+
+			// Savepoint restore settings
+			SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
+			String savepointPath = queryParams.get("savepointPath");
+			if (savepointPath != null && !savepointPath.equals("")) {
+				String allowNonRestoredOpt = queryParams.get("allowNonRestoredState");
+				boolean allowNonRestoredState = allowNonRestoredOpt != null && allowNonRestoredOpt.equals("true");
+				savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+			}
+
+			return new JarActionHandlerConfig(jarFile, programArgs, entryClass, parallelism, savepointSettings);
+		}
+
+		/**
+		 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+		 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+		 * takes care of quotes, such that quoted passages end up being one string.
+		 *
+		 * @param args
+		 *        The string to be split.
+		 * @return The array of split strings.
+		 */
+		private static List<String> tokenizeArguments(String args) {
+			List<String> list = new ArrayList<String>();
+			StringBuilder curr = new StringBuilder();
+
+			int pos = 0;
+			boolean quoted = false;
+
+			while (pos < args.length()) {
+				char c = args.charAt(pos);
+				if ((c == ' ' || c == '\t') && !quoted) {
+					if (curr.length() > 0) {
+						list.add(curr.toString());
+						curr.setLength(0);
+					}
+				} else if (c == '"') {
+					quoted = !quoted;
+				} else {
+					curr.append(c);
+				}
+
+				pos++;
+			}
+
+			if (quoted) {
+				throw new IllegalArgumentException("Unterminated quoted string.");
+			}
+
+			if (curr.length() > 0) {
+				list.add(curr.toString());
+			}
+
+			return list;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 3a95d6a..bd0a6af 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -39,7 +39,8 @@ public class JarPlanHandler extends JarActionHandler {
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
-			JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
+			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+			JobGraph graph = getJobGraphAndClassLoader(config).f0;
 			StringWriter writer = new StringWriter();
 			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 			gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 8d3e57f..474be33 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -50,7 +50,8 @@ public class JarRunHandler extends JarActionHandler {
 	@Override
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
-			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
+			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 			try {
 				graph.f0.uploadUserJars(jobManager, timeout, clientConfig);
 			} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
new file mode 100644
index 0000000..fbac126
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.JarActionHandlerConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JarActionHandlerTest {
+
+	/**
+	 * Test that the savepoint settings are correctly parsed.
+	 */
+	@Test
+	public void testSavepointRestoreSettings() throws Exception {
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put("jarid", "required"); // required
+
+		// the following should be ignored, because they are parsed from the query params
+		pathParams.put("savepointPath", "ignored");
+		pathParams.put("allowNonRestoredState", "ignored");
+
+		Map<String, String> queryParams = new HashMap<>(); // <-- everything goes here
+
+		// Nothing configured
+		JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+		assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings());
+
+		// Set path
+		queryParams.put("savepointPath", "the-savepoint-path");
+		queryParams.put("allowNonRestoredState", "");
+
+		SavepointRestoreSettings expected = SavepointRestoreSettings.forPath("the-savepoint-path", false);
+
+		config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+		assertEquals(expected, config.getSavepointRestoreSettings());
+
+		// Set flag
+		queryParams.put("allowNonRestoredState", "true");
+
+		expected = SavepointRestoreSettings.forPath("the-savepoint-path", true);
+		config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+		assertEquals(expected, config.getSavepointRestoreSettings());
+	}
+
+	/**
+	 * Tests that empty String params are handled ignored.
+	 */
+	@Test
+	public void testEmptyStringParams() throws Exception {
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put("jarid", "required"); // required
+		Map<String, String> queryParams = new HashMap<>();
+
+		queryParams.put("program-args", "");
+		queryParams.put("entry-class", "");
+		queryParams.put("parallelism", "");
+		queryParams.put("savepointPath", "");
+		queryParams.put("allowNonRestoredState", "");
+
+		// Nothing configured
+		JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+
+		assertEquals(0, config.getProgramArgs().length);
+		assertNull(config.getEntryClass());
+		assertEquals(1, config.getParallelism());
+		assertEquals(SavepointRestoreSettings.none(), config.getSavepointRestoreSettings());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/partials/submit.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/submit.jade b/flink-runtime-web/web-dashboard/app/partials/submit.jade
index 45547a0..03e10fa 100644
--- a/flink-runtime-web/web-dashboard/app/partials/submit.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/submit.jade
@@ -85,6 +85,13 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
             | {{state['submit-button']}}
           | &nbsp;
           i.fa.fa-spin.fa-spinner(ng-if="state['submit-button'] == 'Submitting'")
+      tr
+        td
+          input.form-control(type="text" placeholder="Savepoint Path" title="Savepoint Path" ng-model="state['savepointPath']")
+        td
+          label.checkbox-inline
+            input.checkbox-inline(type="checkbox", ng-model="state['allowNonRestoredState']")
+            | Allow Non Restored State
 
   table.table.table-no-border(ng-if="jid")
     tbody

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index aeb77bc..179e172 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
 
 .value 'flinkConfig', {
   jobServer: ''
-# jobServer: 'http://localhost:8081/'
+#  jobServer: 'http://localhost:8081/'
   "refresh-interval": 10000
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index be495e4..0cd95a5 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -32,6 +32,8 @@ angular.module('flinkApp')
     $scope.state = {
       selected: null,
       parallelism: "",
+      savepointPath: "",
+      allowNonRestoredState: false
       'entry-class': "",
       'program-args': "",
       'plan-button': "Show Plan",
@@ -100,7 +102,9 @@ angular.module('flinkApp')
         $scope.state.selected, {
           'entry-class': $scope.state['entry-class'],
           parallelism: $scope.state.parallelism,
-          'program-args': $scope.state['program-args']
+          'program-args': $scope.state['program-args'],
+          savepointPath: $scope.state['savepointPath'],
+          allowNonRestoredState: $scope.state['allowNonRestoredState']
         }
       ).then (data) ->
         if action == $scope.state['action-time']

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c88cec/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
index 0f2cb96..f4b4aa6 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
@@ -23,7 +23,7 @@ angular.module('flinkApp')
   @loadJarList = () ->
     deferred = $q.defer()
 
-    $http.get("jars/")
+    $http.get(flinkConfig.jobServer + "jars/")
     .success (data, status, headers, config) ->
       deferred.resolve(data)
 
@@ -32,7 +32,7 @@ angular.module('flinkApp')
   @deleteJar = (id) ->
     deferred = $q.defer()
 
-    $http.delete("jars/" + encodeURIComponent(id))
+    $http.delete(flinkConfig.jobServer + "jars/" + encodeURIComponent(id))
     .success (data, status, headers, config) ->
        deferred.resolve(data)
 
@@ -41,7 +41,7 @@ angular.module('flinkApp')
   @getPlan = (id, args) ->
     deferred = $q.defer()
 
-    $http.get("jars/" + encodeURIComponent(id) + "/plan", {params: args})
+    $http.get(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/plan", {params: args})
     .success (data, status, headers, config) ->
       deferred.resolve(data)
 
@@ -50,7 +50,7 @@ angular.module('flinkApp')
   @runJob = (id, args) ->
     deferred = $q.defer()
 
-    $http.post("jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
+    $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
     .success (data, status, headers, config) ->
       deferred.resolve(data)