You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/07/01 12:56:56 UTC

[1/2] flink git commit: [FLINK-2175] extending WebClient and CLI to show ProgramDescription added WordCountMeta example (used ProgramDescription interface to display meta data by clients) extended WebClient documenation

Repository: flink
Updated Branches:
  refs/heads/master abaf4af17 -> bd3c8d525


[FLINK-2175] extending WebClient and CLI to show ProgramDescription
added WordCountMeta example (used ProgramDescription interface to display meta data by clients)
extended WebClient documenation

This closes #707


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd3c8d52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd3c8d52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd3c8d52

Branch: refs/heads/master
Commit: bd3c8d525203fddf1770b6eb7221f71049480ad8
Parents: 7e75c7b
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Fri Jun 5 18:55:08 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jul 1 12:56:41 2015 +0200

----------------------------------------------------------------------
 docs/apis/web_client.md                         |  4 +-
 .../org/apache/flink/client/CliFrontend.java    | 10 ++++
 .../apache/flink/client/web/JobsServlet.java    | 19 ++++++-
 .../src/main/resources/web-docs/js/program.js   |  7 ++-
 .../examples/java/wordcount/WordCountMeta.java  | 54 ++++++++++++++++++++
 5 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd3c8d52/docs/apis/web_client.md
----------------------------------------------------------------------
diff --git a/docs/apis/web_client.md b/docs/apis/web_client.md
index a548a6a..d3680f1 100644
--- a/docs/apis/web_client.md
+++ b/docs/apis/web_client.md
@@ -56,12 +56,14 @@ You can **upload** a Flink program as a jar file. To **execute** an uploaded pro
 
 If the *“Show optimizer plan”* option is enabled (default), the *plan view* is display next, otherwise the job is directly submitted to the JobManager for execution.
 
-In case the jar's manifest file does not specify the program class, you can specify it before the argument list as:
+The web interface can also handle multiple Flink jobs within a single jar file. To use this feature, package all required class files of all jobs into a single jar and specify the entry classes for each job as comma-separated-values in *program-class* argument within the jar's manifest file. The job view displays each entry class and you can pick any of them to preview the plan and/or submit the job to the JobManager. In case the jar's manifest file does not specify any entry class, you can specify it before the argument list as:
 
 ```
 -c <assemblerClass> <programArgs...>
 ```
 
+Furthermore, for each entry class implementing ```ProgramDescription``` interface, the provided description is shown as tooltip for the job (see {% gh_link flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCountMeta.java  "WordCountMeta example" %}).
+
 ### Plan View
 
 The plan view shows the optimized execution plan of the submitted program in the upper half of the page. The bottom part of the page displays detailed information about the currently selected plan operator including:

http://git-wip-us.apache.org/repos/asf/flink/blob/bd3c8d52/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e1bacde..241df2d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -378,6 +378,16 @@ public class CliFrontend {
 			else {
 				System.out.println("JSON plan could not be generated.");
 			}
+
+			String description = program.getDescription();
+			if (description != null) {
+				System.out.println();
+				System.out.println(description);
+			}
+			else {
+				System.out.println();
+				System.out.println("No description provided.");
+			}
 			return 0;
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd3c8d52/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index cee13dd..abf3b3f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -41,6 +41,7 @@ import org.apache.commons.fileupload.FileUploadException;
 import org.apache.commons.fileupload.disk.DiskFileItemFactory;
 import org.apache.commons.fileupload.servlet.ServletFileUpload;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
 
 /**
  * A servlet that accepts uploads of pact programs, returns a listing of the
@@ -144,6 +145,8 @@ public class JobsServlet extends HttpServlet {
 				JarFile jar = new JarFile(files[i]);
 				Manifest manifest = jar.getManifest();
 				String assemblerClass = null;
+				String descriptions = "";
+
 				if (manifest != null) {
 					assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
 					if (assemblerClass == null) {
@@ -153,6 +156,20 @@ public class JobsServlet extends HttpServlet {
 				if (assemblerClass == null) {
 					assemblerClass = "";
 				} else {
+					String[] classes = assemblerClass.split(",");
+					for (String c : classes) {
+						try {
+							String d = new PackagedProgram(files[i], c, new String[0]).getDescription();
+							if (d == null) {
+								d = "No description provided.";
+							}
+							descriptions += "#_#" + d;
+						} catch (ProgramInvocationException e) {
+							descriptions += "#_#No description provided.";
+							continue;
+						}
+					}
+
 					assemblerClass = '\t' + assemblerClass;
 				}
 
@@ -160,7 +177,7 @@ public class JobsServlet extends HttpServlet {
 				writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
 					+ cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
 					+ cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
-					+ cal.get(GregorianCalendar.SECOND) + assemblerClass);
+					+ cal.get(GregorianCalendar.SECOND) + assemblerClass + descriptions);
 			}
 		} else if (action.equals(ACTION_DELETE_VALUE)) {
 			String filename = req.getParameter(FILENAME_PARAM_NAME);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd3c8d52/flink-clients/src/main/resources/web-docs/js/program.js
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/resources/web-docs/js/program.js b/flink-clients/src/main/resources/web-docs/js/program.js
index 619deb8..d3c4c88 100644
--- a/flink-clients/src/main/resources/web-docs/js/program.js
+++ b/flink-clients/src/main/resources/web-docs/js/program.js
@@ -150,7 +150,9 @@ function createJobList(data)
 {
   var markup = "";
   
-  var lines = data.split("\n");
+  var entries = data.split("#_#");
+  
+  var lines = entries[0].split("\n");
   for (var i = 0; i < lines.length; i++)
   {
     if (lines[i] == null || lines[i].length == 0) {
@@ -179,9 +181,10 @@ function createJobList(data)
     markup += '<td><p class="JobListItemsDate">' + date + '</p></td>';
     markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td></tr>';
     
+    var i = 0;
     for (var idx in classes) {
       markup += '<tr><td width="30px;"><input id="' + classes[idx] + '" class="jobItemCheckbox" type="checkbox"></td>';
-      markup += '<td colspan="3"><p class="JobListItemsDate">' + classes[idx] + '</p></td></tr>';
+      markup += '<td colspan="3"><p class="JobListItemsDate" title="' + entries[++i] + '">' + classes[idx] + '</p></td></tr>';
     }
     markup += '</table></div>';
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd3c8d52/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCountMeta.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCountMeta.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCountMeta.java
new file mode 100644
index 0000000..b8bd7c1
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCountMeta.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.wordcount;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+/**
+ * Same as {@link WorkCount} but implements {@link ProgramDescription} interface.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCountProgram [&lt;text path&gt; &lt;result path&gt;]</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows:
+ * <ul>
+ * <li>how to provide additional information (using {@link ProgramDescription} interface}, that can be displayed by
+ * Flink clients, ie, {@link bin/flink} and WebClient</li>
+ * </ul>
+ * 
+ */
+public class WordCountMeta extends WordCount implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+		WordCount.main(args);
+	}
+
+	@Override
+	public String getDescription() {
+		return "Simple Word-Count Example\n"
+				+ "Parameters: [<text path> <result path>]\n"
+				+ "If no parameters are provided, the example will run with built-in default data.";
+	}
+}


[2/2] flink git commit: [FLINK-2175] added UI support for multiple "assembler class" entries in jar files - program-class can contain multiple comma seperated entry classes - Main-Class can contain multiple comma seperated entry classes - WebClient

Posted by rm...@apache.org.
[FLINK-2175] added UI support for multiple "assembler class" entries in jar files
  - program-class can contain multiple comma seperated entry classes
  - Main-Class can contain multiple comma seperated entry classes
  - WebClient shows checkbox for each entry class and can display plan for it


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e75c7b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e75c7b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e75c7b3

Branch: refs/heads/master
Commit: 7e75c7b3cc89d8b81341dea6911c39cbfaa6138d
Parents: abaf4af
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Sun May 17 22:25:17 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jul 1 12:56:41 2015 +0200

----------------------------------------------------------------------
 .../flink/client/web/JobSubmissionServlet.java  | 987 ++++++++++---------
 .../apache/flink/client/web/JobsServlet.java    | 442 +++++----
 .../flink/client/web/PactJobJSONServlet.java    |   4 +-
 .../src/main/resources/web-docs/js/program.js   |  89 +-
 4 files changed, 777 insertions(+), 745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 205d0b2..5ec698b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -1,492 +1,495 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class JobSubmissionServlet extends HttpServlet {
-	
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8447312301029847397L;
-
-	// ------------------------------------------------------------------------
-
-	public static final String START_PAGE_URL = "launch.html";
-
-	private static final String ACTION_PARAM_NAME = "action";
-
-	private static final String ACTION_SUBMIT_VALUE = "submit";
-
-	private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
-
-	private static final String ACTION_BACK_VALUE = "back";
-
-	private static final String JOB_PARAM_NAME = "job";
-
-	private static final String ARGUMENTS_PARAM_NAME = "arguments";
-
-	private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
-
-	private static final String SUSPEND_PARAM_NAME = "suspend";
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
-
-	// ------------------------------------------------------------------------
-
-	private final File jobStoreDirectory;				// the directory containing the uploaded jobs
-
-	private final File planDumpDirectory;				// the directory to dump the optimizer plans to
-
-	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
-
-	private final Random rand;							// random number generator for UID
-	
-	private final Configuration config;
-
-
-	public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
-		this.config = config;
-		this.jobStoreDirectory = jobDir;
-		this.planDumpDirectory = planDir;
-
-		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
-
-		this.rand = new Random(System.currentTimeMillis());
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		String action = req.getParameter(ACTION_PARAM_NAME);
-		if (checkParameterSet(resp, action, "action")) {
-			return;
-		}
-
-		// decide according to the action
-		if (action.equals(ACTION_SUBMIT_VALUE)) {
-			// --------------- submit a job -------------------
-
-			// get the parameters
-			String jobName = req.getParameter(JOB_PARAM_NAME);
-			String args = req.getParameter(ARGUMENTS_PARAM_NAME);
-			String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
-			String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
-
-			// check that all parameters are set
-			if (checkParameterSet(resp, jobName, JOB_PARAM_NAME) || checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
-				|| checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
-				|| checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
-			{
-				showErrorPage(resp, "Invalid request, missing parameters.");
-				return;
-			}
-
-			boolean show = Boolean.parseBoolean(showPlan);
-			boolean suspend = Boolean.parseBoolean(suspendPlan);
-
-			// check, if the jar exists
-			File jarFile = new File(jobStoreDirectory, jobName);
-			if (!jarFile.exists()) {
-				showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
-				return;
-			}
-
-			// parse the arguments
-			List<String> params;
-			try {
-				params = tokenizeArguments(args);
-			} catch (IllegalArgumentException iaex) {
-				showErrorPage(resp, "The arguments contain an unterminated quoted string.");
-				return;
-			}
-
-			String assemblerClass = null;
-			int parallelism = -1;
-			while(params.size() >= 2) {
-				if (params.get(0).equals("-c")) {
-					assemblerClass = params.get(1);
-					params.remove(0);
-					params.remove(0);
-				}
-				else if (params.get(0).equals("-p")) {
-					parallelism = Integer.parseInt(params.get(1));
-					params.remove(0);
-					params.remove(0);
-				}
-				else {
-					break;
-				}
-			}
-
-			// create the plan
-			String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
-			PackagedProgram program;
-			FlinkPlan optPlan;
-			Client client;
-			
-			try {
-				if (assemblerClass == null) {
-					program = new PackagedProgram(jarFile, options);
-				} else {
-					program = new PackagedProgram(jarFile, assemblerClass, options);
-				}
-				
-				client = new Client(config, program.getUserCodeClassLoader());
-				
-				optPlan = client.getOptimizedPlan(program, parallelism);
-				
-				if (optPlan == null) {
-					throw new Exception("The optimized plan could not be produced.");
-				}
-			}
-			catch (ProgramInvocationException e) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				
-				if (e.getCause() == null) {
-					e.printStackTrace(w);
-				} else {
-					e.getCause().printStackTrace(w);
-				}
-
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-				
-				showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
-					+ e.getMessage() + "<br/>"
-					+ "<br/><br/><pre>" + message + "</pre>");
-				return;
-			}
-			catch (CompilerException cex) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				cex.printStackTrace(w);
-				
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-
-				showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
-					+ cex.getMessage() + "<br/>"
-					+ (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
-					+ "<br/><br/><pre>" + message + "</pre>");
-				return;
-			}
-			catch (Throwable t) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				t.printStackTrace(w);
-
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-				
-				showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
-					+ message + "</pre>");
-				return;
-			}
-
-			// redirect according to our options
-			if (show) {
-				// we have a request to show the plan
-
-				// create a UID for the job
-				Long uid;
-				do {
-					uid = Math.abs(this.rand.nextLong());
-				} while (this.submittedJobs.containsKey(uid));
-
-				// dump the job to a JSON file
-				String planName = uid + ".json";
-				File jsonFile = new File(this.planDumpDirectory, planName);
-				
-				if (optPlan instanceof StreamingPlan) {
-					((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
-				}
-				else {
-					PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-					jsonGen.setEncodeForHTML(true);
-					jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
-				}
-				
-				// submit the job only, if it should not be suspended
-				if (!suspend) {
-					if (optPlan instanceof OptimizedPlan) {
-						try {
-							client.run(program, (OptimizedPlan) optPlan, false);
-						}
-						catch (Throwable t) {
-							LOG.error("Error submitting job to the job-manager.", t);
-							showErrorPage(resp, t.getMessage());
-							return;
-						}
-						finally {
-							program.deleteExtractedLibraries();
-						}
-					}
-					else {
-						throw new RuntimeException("Not implemented for Streaming Job plans");
-					}
-				}
-				else {
-					try {
-						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
-					}
-					catch (ProgramInvocationException piex) {
-						LOG.error("Error creating JobGraph from optimized plan.", piex);
-						showErrorPage(resp, piex.getMessage());
-						return;
-					}
-					catch (Throwable t) {
-						LOG.error("Error creating JobGraph from optimized plan.", t);
-						showErrorPage(resp, t.getMessage());
-						return;
-					}
-				}
-
-				// redirect to the plan display page
-				resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
-			}
-			else {
-				// don't show any plan. directly submit the job and redirect to the
-				// runtime monitor
-				try {
-					client.run(program, parallelism, false);
-				}
-				catch (Exception ex) {
-					LOG.error("Error submitting job to the job-manager.", ex);
-					// HACK: Is necessary because Message contains whole stack trace
-					String errorMessage = ex.getMessage().split("\n")[0];
-					showErrorPage(resp, errorMessage);
-					return;
-				}
-				finally {
-					program.deleteExtractedLibraries();
-				}
-				resp.sendRedirect(START_PAGE_URL);
-			}
-		}
-		else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
-			// --------------- run a job that has been submitted earlier, but was -------------------
-			// --------------- not executed because of a plan display -------------------
-
-			String id = req.getParameter("id");
-			if (checkParameterSet(resp, id, "id")) {
-				return;
-			}
-
-			Long uid = null;
-			try {
-				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
-				showErrorPage(resp, "An invalid id for the job was provided.");
-				return;
-			}
-
-			// get the retained job
-			JobGraph job = submittedJobs.remove(uid);
-			if (job == null) {
-				resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
-					"No job with the given uid was retained for later submission.");
-				return;
-			}
-
-			// submit the job
-			try {
-				Client client = new Client(config, getClass().getClassLoader());
-				client.run(job, false);
-			}
-			catch (Exception ex) {
-				LOG.error("Error submitting job to the job-manager.", ex);
-				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-				// HACK: Is necessary because Message contains whole stack trace
-				String errorMessage = ex.getMessage().split("\n")[0];
-				resp.getWriter().print(errorMessage);
-				// resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
-				return;
-			}
-
-			// redirect to the start page
-			resp.sendRedirect(START_PAGE_URL);
-		} else if (action.equals(ACTION_BACK_VALUE)) {
-			// remove the job from the map
-
-			String id = req.getParameter("id");
-			if (checkParameterSet(resp, id, "id")) {
-				return;
-			}
-
-			Long uid = null;
-			try {
-				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
-				showErrorPage(resp, "An invalid id for the job was provided.");
-				return;
-			}
-
-			// remove the retained job
-			submittedJobs.remove(uid);
-
-			// redirect to the start page
-			resp.sendRedirect(START_PAGE_URL);
-		} else {
-			showErrorPage(resp, "Invalid action specified.");
-			return;
-		}
-	}
-
-	/**
-	 * Prints the error page, containing the given message.
-	 * 
-	 * @param resp
-	 *        The response handler.
-	 * @param message
-	 *        The message to display.
-	 * @throws IOException
-	 *         Thrown, if the error page could not be printed due to an I/O problem.
-	 */
-	private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
-
-		PrintWriter writer = resp.getWriter();
-
-		writer
-			.println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n        \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
-		writer.println("<html>");
-		writer.println("<head>");
-		writer.println("  <title>Launch Job - Error</title>");
-		writer.println("  <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
-		writer.println("  <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
-		writer.println("</head>");
-
-		writer.println("<body>");
-		writer.println("  <div class=\"mainHeading\">");
-		writer.println("    <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
-		writer.println("  </div>");
-		writer.println("  <div style=\"margin-top: 50px; text-align: center;\">");
-		writer.println("    <p class=\"error_text\" style=\"font-size: 18px;\">");
-		writer.println(message);
-		writer.println("    </p><br/><br/>");
-		writer.println("    <form action=\"launch.html\" method=\"GET\">");
-		writer.println("      <input type=\"submit\" value=\"back\">");
-		writer.println("    </form>");
-		writer.println("  </div>");
-		writer.println("</body>");
-		writer.println("</html>");
-	}
-
-	/**
-	 * Checks the given parameter. If it is null, it prints the error page.
-	 * 
-	 * @param resp
-	 *        The response handler.
-	 * @param parameter
-	 *        The parameter to check.
-	 * @param parameterName
-	 *        The name of the parameter, to describe it in the error message.
-	 * @return True, if the parameter is null, false otherwise.
-	 * @throws IOException
-	 *         Thrown, if the error page could not be printed.
-	 */
-	private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
-			throws IOException {
-		if (parameter == null) {
-			showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	/**
-	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
-	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
-	 * takes care of quotes, such that quoted passages end up being one string.
-	 * 
-	 * @param args
-	 *        The string to be split.
-	 * @return The array of split strings.
-	 */
-	private static final List<String> tokenizeArguments(String args) {
-		List<String> list = new ArrayList<String>();
-		StringBuilder curr = new StringBuilder();
-
-		int pos = 0;
-		boolean quoted = false;
-
-		while (pos < args.length()) {
-			char c = args.charAt(pos);
-			if ((c == ' ' || c == '\t') && !quoted) {
-				if (curr.length() > 0) {
-					list.add(curr.toString());
-					curr.setLength(0);
-				}
-			} else if (c == '"') {
-				quoted = !quoted;
-			} else {
-				curr.append(c);
-			}
-
-			pos++;
-		}
-
-		if (quoted) {
-			throw new IllegalArgumentException("Unterminated quoted string.");
-		}
-
-		if (curr.length() > 0) {
-			list.add(curr.toString());
-		}
-
-		return list;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSubmissionServlet extends HttpServlet {
+
+	/**
+	 * Serial UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = 8447312301029847397L;
+
+	// ------------------------------------------------------------------------
+
+	public static final String START_PAGE_URL = "launch.html";
+
+	private static final String ACTION_PARAM_NAME = "action";
+
+	private static final String ACTION_SUBMIT_VALUE = "submit";
+
+	private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
+
+	private static final String ACTION_BACK_VALUE = "back";
+
+	private static final String JOB_PARAM_NAME = "job";
+
+	private static final String CLASS_PARAM_NAME = "assemblerClass";
+
+	private static final String ARGUMENTS_PARAM_NAME = "arguments";
+
+	private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
+
+	private static final String SUSPEND_PARAM_NAME = "suspend";
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
+
+	// ------------------------------------------------------------------------
+
+	private final File jobStoreDirectory;				// the directory containing the uploaded jobs
+
+	private final File planDumpDirectory;				// the directory to dump the optimizer plans to
+
+	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
+
+	private final Random rand;							// random number generator for UID
+
+	private final Configuration config;
+
+
+	public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
+		this.config = config;
+		this.jobStoreDirectory = jobDir;
+		this.planDumpDirectory = planDir;
+
+		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
+
+		this.rand = new Random(System.currentTimeMillis());
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		String action = req.getParameter(ACTION_PARAM_NAME);
+		if (checkParameterSet(resp, action, "action")) {
+			return;
+		}
+
+		// decide according to the action
+		if (action.equals(ACTION_SUBMIT_VALUE)) {
+			// --------------- submit a job -------------------
+
+			// get the parameters
+			String jobName = req.getParameter(JOB_PARAM_NAME);
+			String assemblerClass = req.getParameter(CLASS_PARAM_NAME);
+			String args = req.getParameter(ARGUMENTS_PARAM_NAME);
+			String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
+			String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
+
+			// check that all parameters are set
+			// do NOT check 'assemblerClass' -> it is OK if it is not set
+			if (checkParameterSet(resp, jobName, JOB_PARAM_NAME)
+				|| checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
+				|| checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
+				|| checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
+			{
+				return;
+			}
+
+			boolean show = Boolean.parseBoolean(showPlan);
+			boolean suspend = Boolean.parseBoolean(suspendPlan);
+
+			// check, if the jar exists
+			File jarFile = new File(jobStoreDirectory, jobName);
+			if (!jarFile.exists()) {
+				showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
+				return;
+			}
+
+			// parse the arguments
+			List<String> params;
+			try {
+				params = tokenizeArguments(args);
+			} catch (IllegalArgumentException iaex) {
+				showErrorPage(resp, "The arguments contain an unterminated quoted string.");
+				return;
+			}
+
+			int parallelism = -1;
+			while(params.size() >= 2) {
+				if (params.get(0).equals("-c")) {
+					assemblerClass = params.get(1);
+					params.remove(0);
+					params.remove(0);
+				}
+				else if (params.get(0).equals("-p")) {
+					parallelism = Integer.parseInt(params.get(1));
+					params.remove(0);
+					params.remove(0);
+				}
+				else {
+					break;
+				}
+			}
+
+			// create the plan
+			String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
+			PackagedProgram program;
+			FlinkPlan optPlan;
+			Client client;
+
+			try {
+				if (assemblerClass == null) {
+					program = new PackagedProgram(jarFile, options);
+				} else {
+					program = new PackagedProgram(jarFile, assemblerClass, options);
+				}
+
+				client = new Client(config, program.getUserCodeClassLoader());
+
+				optPlan = client.getOptimizedPlan(program, parallelism);
+
+				if (optPlan == null) {
+					throw new Exception("The optimized plan could not be produced.");
+				}
+			}
+			catch (ProgramInvocationException e) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+
+				if (e.getCause() == null) {
+					e.printStackTrace(w);
+				} else {
+					e.getCause().printStackTrace(w);
+				}
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
+					+ e.getMessage() + "<br/>"
+					+ "<br/><br/><pre>" + message + "</pre>");
+				return;
+			}
+			catch (CompilerException cex) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+				cex.printStackTrace(w);
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
+					+ cex.getMessage() + "<br/>"
+					+ (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
+					+ "<br/><br/><pre>" + message + "</pre>");
+				return;
+			}
+			catch (Throwable t) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+				t.printStackTrace(w);
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
+					+ message + "</pre>");
+				return;
+			}
+
+			// redirect according to our options
+			if (show) {
+				// we have a request to show the plan
+
+				// create a UID for the job
+				Long uid;
+				do {
+					uid = Math.abs(this.rand.nextLong());
+				} while (this.submittedJobs.containsKey(uid));
+
+				// dump the job to a JSON file
+				String planName = uid + ".json";
+				File jsonFile = new File(this.planDumpDirectory, planName);
+
+				if (optPlan instanceof StreamingPlan) {
+					((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
+				}
+				else {
+					PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+					jsonGen.setEncodeForHTML(true);
+					jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
+				}
+
+				// submit the job only, if it should not be suspended
+				if (!suspend) {
+					if (optPlan instanceof OptimizedPlan) {
+						try {
+							client.run(program, (OptimizedPlan) optPlan, false);
+						}
+						catch (Throwable t) {
+							LOG.error("Error submitting job to the job-manager.", t);
+							showErrorPage(resp, t.getMessage());
+							return;
+						}
+						finally {
+							program.deleteExtractedLibraries();
+						}
+					}
+					else {
+						throw new RuntimeException("Not implemented for Streaming Job plans");
+					}
+				}
+				else {
+					try {
+						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
+					}
+					catch (ProgramInvocationException piex) {
+						LOG.error("Error creating JobGraph from optimized plan.", piex);
+						showErrorPage(resp, piex.getMessage());
+						return;
+					}
+					catch (Throwable t) {
+						LOG.error("Error creating JobGraph from optimized plan.", t);
+						showErrorPage(resp, t.getMessage());
+						return;
+					}
+				}
+
+				// redirect to the plan display page
+				resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
+			}
+			else {
+				// don't show any plan. directly submit the job and redirect to the
+				// runtime monitor
+				try {
+					client.run(program, parallelism, false);
+				}
+				catch (Exception ex) {
+					LOG.error("Error submitting job to the job-manager.", ex);
+					// HACK: Is necessary because Message contains whole stack trace
+					String errorMessage = ex.getMessage().split("\n")[0];
+					showErrorPage(resp, errorMessage);
+					return;
+				}
+				finally {
+					program.deleteExtractedLibraries();
+				}
+				resp.sendRedirect(START_PAGE_URL);
+			}
+		}
+		else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
+			// --------------- run a job that has been submitted earlier, but was -------------------
+			// --------------- not executed because of a plan display -------------------
+
+			String id = req.getParameter("id");
+			if (checkParameterSet(resp, id, "id")) {
+				return;
+			}
+
+			Long uid = null;
+			try {
+				uid = Long.parseLong(id);
+			} catch (NumberFormatException nfex) {
+				showErrorPage(resp, "An invalid id for the job was provided.");
+				return;
+			}
+
+			// get the retained job
+			JobGraph job = submittedJobs.remove(uid);
+			if (job == null) {
+				resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
+					"No job with the given uid was retained for later submission.");
+				return;
+			}
+
+			// submit the job
+			try {
+				Client client = new Client(config, getClass().getClassLoader());
+				client.run(job, false);
+			}
+			catch (Exception ex) {
+				LOG.error("Error submitting job to the job-manager.", ex);
+				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+				// HACK: Is necessary because Message contains whole stack trace
+				String errorMessage = ex.getMessage().split("\n")[0];
+				resp.getWriter().print(errorMessage);
+				// resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
+				return;
+			}
+
+			// redirect to the start page
+			resp.sendRedirect(START_PAGE_URL);
+		} else if (action.equals(ACTION_BACK_VALUE)) {
+			// remove the job from the map
+
+			String id = req.getParameter("id");
+			if (checkParameterSet(resp, id, "id")) {
+				return;
+			}
+
+			Long uid = null;
+			try {
+				uid = Long.parseLong(id);
+			} catch (NumberFormatException nfex) {
+				showErrorPage(resp, "An invalid id for the job was provided.");
+				return;
+			}
+
+			// remove the retained job
+			submittedJobs.remove(uid);
+
+			// redirect to the start page
+			resp.sendRedirect(START_PAGE_URL);
+		} else {
+			showErrorPage(resp, "Invalid action specified.");
+			return;
+		}
+	}
+
+	/**
+	 * Prints the error page, containing the given message.
+	 * 
+	 * @param resp
+	 *        The response handler.
+	 * @param message
+	 *        The message to display.
+	 * @throws IOException
+	 *         Thrown, if the error page could not be printed due to an I/O problem.
+	 */
+	private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
+		resp.setStatus(HttpServletResponse.SC_OK);
+		resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
+
+		PrintWriter writer = resp.getWriter();
+
+		writer
+			.println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n        \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
+		writer.println("<html>");
+		writer.println("<head>");
+		writer.println("  <title>Launch Job - Error</title>");
+		writer.println("  <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
+		writer.println("  <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
+		writer.println("</head>");
+
+		writer.println("<body>");
+		writer.println("  <div class=\"mainHeading\">");
+		writer.println("    <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
+		writer.println("  </div>");
+		writer.println("  <div style=\"margin-top: 50px; text-align: center;\">");
+		writer.println("    <p class=\"error_text\" style=\"font-size: 18px;\">");
+		writer.println(message);
+		writer.println("    </p><br/><br/>");
+		writer.println("    <form action=\"launch.html\" method=\"GET\">");
+		writer.println("      <input type=\"submit\" value=\"back\">");
+		writer.println("    </form>");
+		writer.println("  </div>");
+		writer.println("</body>");
+		writer.println("</html>");
+	}
+
+	/**
+	 * Checks the given parameter. If it is null, it prints the error page.
+	 * 
+	 * @param resp
+	 *        The response handler.
+	 * @param parameter
+	 *        The parameter to check.
+	 * @param parameterName
+	 *        The name of the parameter, to describe it in the error message.
+	 * @return True, if the parameter is null, false otherwise.
+	 * @throws IOException
+	 *         Thrown, if the error page could not be printed.
+	 */
+	private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
+			throws IOException {
+		if (parameter == null) {
+			showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+	 * takes care of quotes, such that quoted passages end up being one string.
+	 * 
+	 * @param args
+	 *        The string to be split.
+	 * @return The array of split strings.
+	 */
+	private static final List<String> tokenizeArguments(String args) {
+		List<String> list = new ArrayList<String>();
+		StringBuilder curr = new StringBuilder();
+
+		int pos = 0;
+		boolean quoted = false;
+
+		while (pos < args.length()) {
+			char c = args.charAt(pos);
+			if ((c == ' ' || c == '\t') && !quoted) {
+				if (curr.length() > 0) {
+					list.add(curr.toString());
+					curr.setLength(0);
+				}
+			} else if (c == '"') {
+				quoted = !quoted;
+			} else {
+				curr.append(c);
+			}
+
+			pos++;
+		}
+
+		if (quoted) {
+			throw new IllegalArgumentException("Unterminated quoted string.");
+		}
+
+		if (curr.length() > 0) {
+			list.add(curr.toString());
+		}
+
+		return list;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index b3c9cc4..cee13dd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -1,212 +1,230 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.fileupload.FileItem;
-import org.apache.commons.fileupload.FileUploadException;
-import org.apache.commons.fileupload.disk.DiskFileItemFactory;
-import org.apache.commons.fileupload.servlet.ServletFileUpload;
-
-/**
- * A servlet that accepts uploads of pact programs, returns a listing of the
- * directory containing the job jars, and deleting these jars.
- */
-public class JobsServlet extends HttpServlet {
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = -1373210261957434273L;
-
-	// ------------------------------------------------------------------------
-
-	private static final String ACTION_PARAM_NAME = "action";
-
-	private static final String ACTION_LIST_VALUE = "list";
-
-	private static final String ACTION_DELETE_VALUE = "delete";
-
-	private static final String FILENAME_PARAM_NAME = "filename";
-
-	private static final String CONTENT_TYPE_PLAIN = "text/plain";
-
-	private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
-		@Override
-		public int compare(File o1, File o2) {
-			return o1.getName().compareTo(o2.getName());
-		}
-
-	};
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The file object for the directory for temporary files.
-	 */
-	private final File tmpDir;
-
-	/**
-	 * The file object for the directory for the submitted jars.
-	 */
-	private final File destinationDir;
-
-	/**
-	 * The name of the file to redirect to after the upload.
-	 */
-	private final String targetPage;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new <tt>JobServlet</tt>, configured with the given directories
-	 * and the given result page.
-	 * 
-	 * @param jobsDir
-	 *        The directory to store uploaded jobs in.
-	 * @param tmpDir
-	 *        The directory for temporary files.
-	 * @param targetPage
-	 *        The page to redirect to after a successful upload.
-	 */
-	public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
-		this.tmpDir = tmpDir;
-		this.destinationDir = jobsDir;
-		this.targetPage = targetPage;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public void init(ServletConfig config) throws ServletException {
-		super.init(config);
-
-		if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
-			throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
-		}
-
-		if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
-			throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
-		}
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		String action = req.getParameter(ACTION_PARAM_NAME);
-
-		if (action.equals(ACTION_LIST_VALUE)) {
-			GregorianCalendar cal = new GregorianCalendar();
-
-			File[] files = destinationDir.listFiles();
-			Arrays.<File> sort(files, FILE_SORTER);
-
-			resp.setStatus(HttpServletResponse.SC_OK);
-			resp.setContentType(CONTENT_TYPE_PLAIN);
-
-			PrintWriter writer = resp.getWriter();
-			for (int i = 0; i < files.length; i++) {
-				if (!files[i].getName().endsWith(".jar")) {
-					continue;
-				}
-
-				cal.setTimeInMillis(files[i].lastModified());
-				writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
-					+ cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
-					+ cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
-					+ cal.get(GregorianCalendar.SECOND));
-			}
-		} else if (action.equals(ACTION_DELETE_VALUE)) {
-			String filename = req.getParameter(FILENAME_PARAM_NAME);
-
-			if (filename == null || filename.length() == 0) {
-				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			} else {
-				File f = new File(destinationDir, filename);
-				if (!f.exists() || f.isDirectory()) {
-					resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
-				}
-				f.delete();
-				resp.setStatus(HttpServletResponse.SC_OK);
-			}
-		} else {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-		}
-	}
-
-	@Override
-	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		// check, if we are doing the right request
-		if (!ServletFileUpload.isMultipartContent(req)) {
-			resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
-			return;
-		}
-
-		// create the disk file factory, limiting the file size to 20 MB
-		DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
-		fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
-		fileItemFactory.setRepository(tmpDir);
-
-		String filename = null;
-
-		// parse the request
-		ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
-		try {
-			Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
-
-			// go over the form fields and look for our file
-			while (itr.hasNext()) {
-				FileItem item = itr.next();
-				if (!item.isFormField()) {
-					if (item.getFieldName().equals("upload_jar_file")) {
-
-						// found the file, store it to the specified location
-						filename = item.getName();
-						File file = new File(destinationDir, filename);
-						item.write(file);
-						break;
-					}
-				}
-			}
-		} catch (FileUploadException ex) {
-			resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
-			return;
-		} catch (Exception ex) {
-			resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-				"An unknown error occurred during the file upload.");
-			return;
-		}
-
-		// write the okay message
-		resp.sendRedirect(targetPage);
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.fileupload.FileItem;
+import org.apache.commons.fileupload.FileUploadException;
+import org.apache.commons.fileupload.disk.DiskFileItemFactory;
+import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.flink.client.program.PackagedProgram;
+
+/**
+ * A servlet that accepts uploads of pact programs, returns a listing of the
+ * directory containing the job jars, and deleting these jars.
+ */
+public class JobsServlet extends HttpServlet {
+	/**
+	 * Serial UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = -1373210261957434273L;
+
+	// ------------------------------------------------------------------------
+
+	private static final String ACTION_PARAM_NAME = "action";
+
+	private static final String ACTION_LIST_VALUE = "list";
+
+	private static final String ACTION_DELETE_VALUE = "delete";
+
+	private static final String FILENAME_PARAM_NAME = "filename";
+
+	private static final String CONTENT_TYPE_PLAIN = "text/plain";
+
+	private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
+		@Override
+		public int compare(File o1, File o2) {
+			return o1.getName().compareTo(o2.getName());
+		}
+
+	};
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The file object for the directory for temporary files.
+	 */
+	private final File tmpDir;
+
+	/**
+	 * The file object for the directory for the submitted jars.
+	 */
+	private final File destinationDir;
+
+	/**
+	 * The name of the file to redirect to after the upload.
+	 */
+	private final String targetPage;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new <tt>JobServlet</tt>, configured with the given directories
+	 * and the given result page.
+	 * 
+	 * @param jobsDir
+	 *        The directory to store uploaded jobs in.
+	 * @param tmpDir
+	 *        The directory for temporary files.
+	 * @param targetPage
+	 *        The page to redirect to after a successful upload.
+	 */
+	public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
+		this.tmpDir = tmpDir;
+		this.destinationDir = jobsDir;
+		this.targetPage = targetPage;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public void init(ServletConfig config) throws ServletException {
+		super.init(config);
+
+		if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
+			throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
+		}
+
+		if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
+			throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
+		}
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		String action = req.getParameter(ACTION_PARAM_NAME);
+
+		if (action.equals(ACTION_LIST_VALUE)) {
+			GregorianCalendar cal = new GregorianCalendar();
+
+			File[] files = destinationDir.listFiles();
+			Arrays.<File> sort(files, FILE_SORTER);
+
+			resp.setStatus(HttpServletResponse.SC_OK);
+			resp.setContentType(CONTENT_TYPE_PLAIN);
+
+			PrintWriter writer = resp.getWriter();
+			for (int i = 0; i < files.length; i++) {
+				if (!files[i].getName().endsWith(".jar")) {
+					continue;
+				}
+
+				JarFile jar = new JarFile(files[i]);
+				Manifest manifest = jar.getManifest();
+				String assemblerClass = null;
+				if (manifest != null) {
+					assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+					if (assemblerClass == null) {
+						assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+					}
+				}
+				if (assemblerClass == null) {
+					assemblerClass = "";
+				} else {
+					assemblerClass = '\t' + assemblerClass;
+				}
+
+				cal.setTimeInMillis(files[i].lastModified());
+				writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
+					+ cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
+					+ cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
+					+ cal.get(GregorianCalendar.SECOND) + assemblerClass);
+			}
+		} else if (action.equals(ACTION_DELETE_VALUE)) {
+			String filename = req.getParameter(FILENAME_PARAM_NAME);
+
+			if (filename == null || filename.length() == 0) {
+				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			} else {
+				File f = new File(destinationDir, filename);
+				if (!f.exists() || f.isDirectory()) {
+					resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
+				}
+				f.delete();
+				resp.setStatus(HttpServletResponse.SC_OK);
+			}
+		} else {
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+		}
+	}
+
+	@Override
+	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		// check, if we are doing the right request
+		if (!ServletFileUpload.isMultipartContent(req)) {
+			resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
+			return;
+		}
+
+		// create the disk file factory, limiting the file size to 20 MB
+		DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
+		fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
+		fileItemFactory.setRepository(tmpDir);
+
+		String filename = null;
+
+		// parse the request
+		ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
+		try {
+			Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
+
+			// go over the form fields and look for our file
+			while (itr.hasNext()) {
+				FileItem item = itr.next();
+				if (!item.isFormField()) {
+					if (item.getFieldName().equals("upload_jar_file")) {
+
+						// found the file, store it to the specified location
+						filename = item.getName();
+						File file = new File(destinationDir, filename);
+						item.write(file);
+						break;
+					}
+				}
+			}
+		} catch (FileUploadException ex) {
+			resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
+			return;
+		} catch (Exception ex) {
+			resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+				"An unknown error occurred during the file upload.");
+			return;
+		}
+
+		// write the okay message
+		resp.sendRedirect(targetPage);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
index cdef605..019fc50 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
@@ -44,6 +44,8 @@ public class PactJobJSONServlet extends HttpServlet {
 
 	private static final String JOB_PARAM_NAME = "job";
 
+	private static final String CLASS_PARAM_NAME = "assemblerClass";
+
 	// ------------------------------------------------------------------------
 
 	private final File jobStoreDirectory; // the directory in which the jobs are stored
@@ -74,7 +76,7 @@ public class PactJobJSONServlet extends HttpServlet {
 		// create the pact plan
 		PackagedProgram pactProgram;
 		try {
-			pactProgram = new PackagedProgram(jarFile, new String[0]);
+			pactProgram = new PackagedProgram(jarFile, req.getParameter(CLASS_PARAM_NAME), new String[0]);
 		}
 		catch (Throwable t) {
 			LOG.info("Instantiating the PactProgram for '" + jarFile.getName() + "' failed.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/resources/web-docs/js/program.js
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/resources/web-docs/js/program.js b/flink-clients/src/main/resources/web-docs/js/program.js
index c443dab..619deb8 100644
--- a/flink-clients/src/main/resources/web-docs/js/program.js
+++ b/flink-clients/src/main/resources/web-docs/js/program.js
@@ -1,21 +1,21 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 var maxColumnWidth = 200;
 var minColumnWidth = 100;
 
@@ -77,6 +77,7 @@ function toggleCheckboxes(box)
     $('.jobItemCheckbox').attr('checked', false);
     box.attr('checked', true);
     var id = box.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+    var assemblerClass = box.attr('id');
 
     $('#mainCanvas').html('');
     $('#planDescription').html('');
@@ -85,7 +86,7 @@ function toggleCheckboxes(box)
     $.ajax({
         type: "GET",
         url: "pactPlan",
-        data: { job: id },
+        data: { job: id, assemblerClass: assemblerClass},
         success: function(response) { showPreviewPlan(response); }
     });
   }
@@ -154,34 +155,35 @@ function createJobList(data)
   {
     if (lines[i] == null || lines[i].length == 0) {
       continue;
-
     }
     
-    var name = null;
-    var date = null;
-    var tabIx = lines[i].indexOf("\t");
-
-    if (tabIx > 0) {
-      name = lines[i].substr(0, tabIx);
-      if (tabIx < lines[i].length - 1) {
-        date = lines[i].substr(tabIx + 1);
-      }
-      else {
-        date = "unknown date";
+    var name = lines[i];
+    var date = "unknown date";
+    var assemblerClass = "<em>no entry class specified</em>";
+    
+    var tokens = lines[i].split("\t");
+    if (tokens.length > 0) {
+      name = tokens[0];
+      if (tokens.length > 1) {
+        date = tokens[1];
+        if (tokens.length > 2) {
+          assemblerClass = tokens[2];
+        }
       }
     }
-    else {
-      name = lines[i];
-      date = "unknown date";
-    }
     
+    var classes = assemblerClass.split(",");
     
     markup += '<div id="job_' + name + '" class="JobListItems"><table class="table"><tr>';
-    markup += '<td width="30px;"><input class="jobItemCheckbox" type="checkbox"></td>';
-    markup += '<td><p class="JobListItemsName">' + name + '</p></td>';
+    markup += '<td colspan="2"><p class="JobListItemsName">' + name + '</p></td>';
     markup += '<td><p class="JobListItemsDate">' + date + '</p></td>';
-    markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td>';
-    markup += '</tr></table></div>';
+    markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td></tr>';
+    
+    for (var idx in classes) {
+      markup += '<tr><td width="30px;"><input id="' + classes[idx] + '" class="jobItemCheckbox" type="checkbox"></td>';
+      markup += '<td colspan="3"><p class="JobListItemsDate">' + classes[idx] + '</p></td></tr>';
+    }
+    markup += '</table></div>';
   }
   
   // add the contents
@@ -207,11 +209,18 @@ function runJob ()
    }
    
    var jobName = job.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+   var assemblerClass = job.attr('id');
+   
    var showPlan = $('#showPlanCheck').is(':checked');
    var suspendPlan = $('#suspendJobDuringPlanCheck').is(':checked');
    var args = $('#commandLineArgsField').attr('value'); //TODO? Replace with .val() ?
    
-   var url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   var url;
+   if (assemblerClass == "<em>no entry class specified</em>") {
+      url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   } else {
+      url = "runJob?" + $.param({ action: "submit", job: jobName, assemblerClass: assemblerClass, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   }
    
    window.location = url;
 }