You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/07/01 12:56:57 UTC

[2/2] flink git commit: [FLINK-2175] added UI support for multiple "assembler class" entries in jar files - program-class can contain multiple comma seperated entry classes - Main-Class can contain multiple comma seperated entry classes - WebClient

[FLINK-2175] added UI support for multiple "assembler class" entries in jar files
  - program-class can contain multiple comma seperated entry classes
  - Main-Class can contain multiple comma seperated entry classes
  - WebClient shows checkbox for each entry class and can display plan for it


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e75c7b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e75c7b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e75c7b3

Branch: refs/heads/master
Commit: 7e75c7b3cc89d8b81341dea6911c39cbfaa6138d
Parents: abaf4af
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Sun May 17 22:25:17 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jul 1 12:56:41 2015 +0200

----------------------------------------------------------------------
 .../flink/client/web/JobSubmissionServlet.java  | 987 ++++++++++---------
 .../apache/flink/client/web/JobsServlet.java    | 442 +++++----
 .../flink/client/web/PactJobJSONServlet.java    |   4 +-
 .../src/main/resources/web-docs/js/program.js   |  89 +-
 4 files changed, 777 insertions(+), 745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 205d0b2..5ec698b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -1,492 +1,495 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class JobSubmissionServlet extends HttpServlet {
-	
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8447312301029847397L;
-
-	// ------------------------------------------------------------------------
-
-	public static final String START_PAGE_URL = "launch.html";
-
-	private static final String ACTION_PARAM_NAME = "action";
-
-	private static final String ACTION_SUBMIT_VALUE = "submit";
-
-	private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
-
-	private static final String ACTION_BACK_VALUE = "back";
-
-	private static final String JOB_PARAM_NAME = "job";
-
-	private static final String ARGUMENTS_PARAM_NAME = "arguments";
-
-	private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
-
-	private static final String SUSPEND_PARAM_NAME = "suspend";
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
-
-	// ------------------------------------------------------------------------
-
-	private final File jobStoreDirectory;				// the directory containing the uploaded jobs
-
-	private final File planDumpDirectory;				// the directory to dump the optimizer plans to
-
-	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
-
-	private final Random rand;							// random number generator for UID
-	
-	private final Configuration config;
-
-
-	public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
-		this.config = config;
-		this.jobStoreDirectory = jobDir;
-		this.planDumpDirectory = planDir;
-
-		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
-
-		this.rand = new Random(System.currentTimeMillis());
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		String action = req.getParameter(ACTION_PARAM_NAME);
-		if (checkParameterSet(resp, action, "action")) {
-			return;
-		}
-
-		// decide according to the action
-		if (action.equals(ACTION_SUBMIT_VALUE)) {
-			// --------------- submit a job -------------------
-
-			// get the parameters
-			String jobName = req.getParameter(JOB_PARAM_NAME);
-			String args = req.getParameter(ARGUMENTS_PARAM_NAME);
-			String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
-			String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
-
-			// check that all parameters are set
-			if (checkParameterSet(resp, jobName, JOB_PARAM_NAME) || checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
-				|| checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
-				|| checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
-			{
-				showErrorPage(resp, "Invalid request, missing parameters.");
-				return;
-			}
-
-			boolean show = Boolean.parseBoolean(showPlan);
-			boolean suspend = Boolean.parseBoolean(suspendPlan);
-
-			// check, if the jar exists
-			File jarFile = new File(jobStoreDirectory, jobName);
-			if (!jarFile.exists()) {
-				showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
-				return;
-			}
-
-			// parse the arguments
-			List<String> params;
-			try {
-				params = tokenizeArguments(args);
-			} catch (IllegalArgumentException iaex) {
-				showErrorPage(resp, "The arguments contain an unterminated quoted string.");
-				return;
-			}
-
-			String assemblerClass = null;
-			int parallelism = -1;
-			while(params.size() >= 2) {
-				if (params.get(0).equals("-c")) {
-					assemblerClass = params.get(1);
-					params.remove(0);
-					params.remove(0);
-				}
-				else if (params.get(0).equals("-p")) {
-					parallelism = Integer.parseInt(params.get(1));
-					params.remove(0);
-					params.remove(0);
-				}
-				else {
-					break;
-				}
-			}
-
-			// create the plan
-			String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
-			PackagedProgram program;
-			FlinkPlan optPlan;
-			Client client;
-			
-			try {
-				if (assemblerClass == null) {
-					program = new PackagedProgram(jarFile, options);
-				} else {
-					program = new PackagedProgram(jarFile, assemblerClass, options);
-				}
-				
-				client = new Client(config, program.getUserCodeClassLoader());
-				
-				optPlan = client.getOptimizedPlan(program, parallelism);
-				
-				if (optPlan == null) {
-					throw new Exception("The optimized plan could not be produced.");
-				}
-			}
-			catch (ProgramInvocationException e) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				
-				if (e.getCause() == null) {
-					e.printStackTrace(w);
-				} else {
-					e.getCause().printStackTrace(w);
-				}
-
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-				
-				showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
-					+ e.getMessage() + "<br/>"
-					+ "<br/><br/><pre>" + message + "</pre>");
-				return;
-			}
-			catch (CompilerException cex) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				cex.printStackTrace(w);
-				
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-
-				showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
-					+ cex.getMessage() + "<br/>"
-					+ (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
-					+ "<br/><br/><pre>" + message + "</pre>");
-				return;
-			}
-			catch (Throwable t) {
-				// collect the stack trace
-				StringWriter sw = new StringWriter();
-				PrintWriter w = new PrintWriter(sw);
-				t.printStackTrace(w);
-
-				String message = sw.toString();
-				message = StringEscapeUtils.escapeHtml4(message);
-				
-				showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
-					+ message + "</pre>");
-				return;
-			}
-
-			// redirect according to our options
-			if (show) {
-				// we have a request to show the plan
-
-				// create a UID for the job
-				Long uid;
-				do {
-					uid = Math.abs(this.rand.nextLong());
-				} while (this.submittedJobs.containsKey(uid));
-
-				// dump the job to a JSON file
-				String planName = uid + ".json";
-				File jsonFile = new File(this.planDumpDirectory, planName);
-				
-				if (optPlan instanceof StreamingPlan) {
-					((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
-				}
-				else {
-					PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-					jsonGen.setEncodeForHTML(true);
-					jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
-				}
-				
-				// submit the job only, if it should not be suspended
-				if (!suspend) {
-					if (optPlan instanceof OptimizedPlan) {
-						try {
-							client.run(program, (OptimizedPlan) optPlan, false);
-						}
-						catch (Throwable t) {
-							LOG.error("Error submitting job to the job-manager.", t);
-							showErrorPage(resp, t.getMessage());
-							return;
-						}
-						finally {
-							program.deleteExtractedLibraries();
-						}
-					}
-					else {
-						throw new RuntimeException("Not implemented for Streaming Job plans");
-					}
-				}
-				else {
-					try {
-						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
-					}
-					catch (ProgramInvocationException piex) {
-						LOG.error("Error creating JobGraph from optimized plan.", piex);
-						showErrorPage(resp, piex.getMessage());
-						return;
-					}
-					catch (Throwable t) {
-						LOG.error("Error creating JobGraph from optimized plan.", t);
-						showErrorPage(resp, t.getMessage());
-						return;
-					}
-				}
-
-				// redirect to the plan display page
-				resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
-			}
-			else {
-				// don't show any plan. directly submit the job and redirect to the
-				// runtime monitor
-				try {
-					client.run(program, parallelism, false);
-				}
-				catch (Exception ex) {
-					LOG.error("Error submitting job to the job-manager.", ex);
-					// HACK: Is necessary because Message contains whole stack trace
-					String errorMessage = ex.getMessage().split("\n")[0];
-					showErrorPage(resp, errorMessage);
-					return;
-				}
-				finally {
-					program.deleteExtractedLibraries();
-				}
-				resp.sendRedirect(START_PAGE_URL);
-			}
-		}
-		else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
-			// --------------- run a job that has been submitted earlier, but was -------------------
-			// --------------- not executed because of a plan display -------------------
-
-			String id = req.getParameter("id");
-			if (checkParameterSet(resp, id, "id")) {
-				return;
-			}
-
-			Long uid = null;
-			try {
-				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
-				showErrorPage(resp, "An invalid id for the job was provided.");
-				return;
-			}
-
-			// get the retained job
-			JobGraph job = submittedJobs.remove(uid);
-			if (job == null) {
-				resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
-					"No job with the given uid was retained for later submission.");
-				return;
-			}
-
-			// submit the job
-			try {
-				Client client = new Client(config, getClass().getClassLoader());
-				client.run(job, false);
-			}
-			catch (Exception ex) {
-				LOG.error("Error submitting job to the job-manager.", ex);
-				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-				// HACK: Is necessary because Message contains whole stack trace
-				String errorMessage = ex.getMessage().split("\n")[0];
-				resp.getWriter().print(errorMessage);
-				// resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
-				return;
-			}
-
-			// redirect to the start page
-			resp.sendRedirect(START_PAGE_URL);
-		} else if (action.equals(ACTION_BACK_VALUE)) {
-			// remove the job from the map
-
-			String id = req.getParameter("id");
-			if (checkParameterSet(resp, id, "id")) {
-				return;
-			}
-
-			Long uid = null;
-			try {
-				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
-				showErrorPage(resp, "An invalid id for the job was provided.");
-				return;
-			}
-
-			// remove the retained job
-			submittedJobs.remove(uid);
-
-			// redirect to the start page
-			resp.sendRedirect(START_PAGE_URL);
-		} else {
-			showErrorPage(resp, "Invalid action specified.");
-			return;
-		}
-	}
-
-	/**
-	 * Prints the error page, containing the given message.
-	 * 
-	 * @param resp
-	 *        The response handler.
-	 * @param message
-	 *        The message to display.
-	 * @throws IOException
-	 *         Thrown, if the error page could not be printed due to an I/O problem.
-	 */
-	private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
-
-		PrintWriter writer = resp.getWriter();
-
-		writer
-			.println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n        \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
-		writer.println("<html>");
-		writer.println("<head>");
-		writer.println("  <title>Launch Job - Error</title>");
-		writer.println("  <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
-		writer.println("  <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
-		writer.println("</head>");
-
-		writer.println("<body>");
-		writer.println("  <div class=\"mainHeading\">");
-		writer.println("    <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
-		writer.println("  </div>");
-		writer.println("  <div style=\"margin-top: 50px; text-align: center;\">");
-		writer.println("    <p class=\"error_text\" style=\"font-size: 18px;\">");
-		writer.println(message);
-		writer.println("    </p><br/><br/>");
-		writer.println("    <form action=\"launch.html\" method=\"GET\">");
-		writer.println("      <input type=\"submit\" value=\"back\">");
-		writer.println("    </form>");
-		writer.println("  </div>");
-		writer.println("</body>");
-		writer.println("</html>");
-	}
-
-	/**
-	 * Checks the given parameter. If it is null, it prints the error page.
-	 * 
-	 * @param resp
-	 *        The response handler.
-	 * @param parameter
-	 *        The parameter to check.
-	 * @param parameterName
-	 *        The name of the parameter, to describe it in the error message.
-	 * @return True, if the parameter is null, false otherwise.
-	 * @throws IOException
-	 *         Thrown, if the error page could not be printed.
-	 */
-	private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
-			throws IOException {
-		if (parameter == null) {
-			showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	/**
-	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
-	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
-	 * takes care of quotes, such that quoted passages end up being one string.
-	 * 
-	 * @param args
-	 *        The string to be split.
-	 * @return The array of split strings.
-	 */
-	private static final List<String> tokenizeArguments(String args) {
-		List<String> list = new ArrayList<String>();
-		StringBuilder curr = new StringBuilder();
-
-		int pos = 0;
-		boolean quoted = false;
-
-		while (pos < args.length()) {
-			char c = args.charAt(pos);
-			if ((c == ' ' || c == '\t') && !quoted) {
-				if (curr.length() > 0) {
-					list.add(curr.toString());
-					curr.setLength(0);
-				}
-			} else if (c == '"') {
-				quoted = !quoted;
-			} else {
-				curr.append(c);
-			}
-
-			pos++;
-		}
-
-		if (quoted) {
-			throw new IllegalArgumentException("Unterminated quoted string.");
-		}
-
-		if (curr.length() > 0) {
-			list.add(curr.toString());
-		}
-
-		return list;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSubmissionServlet extends HttpServlet {
+
+	/**
+	 * Serial UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = 8447312301029847397L;
+
+	// ------------------------------------------------------------------------
+
+	public static final String START_PAGE_URL = "launch.html";
+
+	private static final String ACTION_PARAM_NAME = "action";
+
+	private static final String ACTION_SUBMIT_VALUE = "submit";
+
+	private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
+
+	private static final String ACTION_BACK_VALUE = "back";
+
+	private static final String JOB_PARAM_NAME = "job";
+
+	private static final String CLASS_PARAM_NAME = "assemblerClass";
+
+	private static final String ARGUMENTS_PARAM_NAME = "arguments";
+
+	private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
+
+	private static final String SUSPEND_PARAM_NAME = "suspend";
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
+
+	// ------------------------------------------------------------------------
+
+	private final File jobStoreDirectory;				// the directory containing the uploaded jobs
+
+	private final File planDumpDirectory;				// the directory to dump the optimizer plans to
+
+	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
+
+	private final Random rand;							// random number generator for UID
+
+	private final Configuration config;
+
+
+	public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
+		this.config = config;
+		this.jobStoreDirectory = jobDir;
+		this.planDumpDirectory = planDir;
+
+		this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
+
+		this.rand = new Random(System.currentTimeMillis());
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		String action = req.getParameter(ACTION_PARAM_NAME);
+		if (checkParameterSet(resp, action, "action")) {
+			return;
+		}
+
+		// decide according to the action
+		if (action.equals(ACTION_SUBMIT_VALUE)) {
+			// --------------- submit a job -------------------
+
+			// get the parameters
+			String jobName = req.getParameter(JOB_PARAM_NAME);
+			String assemblerClass = req.getParameter(CLASS_PARAM_NAME);
+			String args = req.getParameter(ARGUMENTS_PARAM_NAME);
+			String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
+			String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
+
+			// check that all parameters are set
+			// do NOT check 'assemblerClass' -> it is OK if it is not set
+			if (checkParameterSet(resp, jobName, JOB_PARAM_NAME)
+				|| checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
+				|| checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
+				|| checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
+			{
+				return;
+			}
+
+			boolean show = Boolean.parseBoolean(showPlan);
+			boolean suspend = Boolean.parseBoolean(suspendPlan);
+
+			// check, if the jar exists
+			File jarFile = new File(jobStoreDirectory, jobName);
+			if (!jarFile.exists()) {
+				showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
+				return;
+			}
+
+			// parse the arguments
+			List<String> params;
+			try {
+				params = tokenizeArguments(args);
+			} catch (IllegalArgumentException iaex) {
+				showErrorPage(resp, "The arguments contain an unterminated quoted string.");
+				return;
+			}
+
+			int parallelism = -1;
+			while(params.size() >= 2) {
+				if (params.get(0).equals("-c")) {
+					assemblerClass = params.get(1);
+					params.remove(0);
+					params.remove(0);
+				}
+				else if (params.get(0).equals("-p")) {
+					parallelism = Integer.parseInt(params.get(1));
+					params.remove(0);
+					params.remove(0);
+				}
+				else {
+					break;
+				}
+			}
+
+			// create the plan
+			String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
+			PackagedProgram program;
+			FlinkPlan optPlan;
+			Client client;
+
+			try {
+				if (assemblerClass == null) {
+					program = new PackagedProgram(jarFile, options);
+				} else {
+					program = new PackagedProgram(jarFile, assemblerClass, options);
+				}
+
+				client = new Client(config, program.getUserCodeClassLoader());
+
+				optPlan = client.getOptimizedPlan(program, parallelism);
+
+				if (optPlan == null) {
+					throw new Exception("The optimized plan could not be produced.");
+				}
+			}
+			catch (ProgramInvocationException e) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+
+				if (e.getCause() == null) {
+					e.printStackTrace(w);
+				} else {
+					e.getCause().printStackTrace(w);
+				}
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
+					+ e.getMessage() + "<br/>"
+					+ "<br/><br/><pre>" + message + "</pre>");
+				return;
+			}
+			catch (CompilerException cex) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+				cex.printStackTrace(w);
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
+					+ cex.getMessage() + "<br/>"
+					+ (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
+					+ "<br/><br/><pre>" + message + "</pre>");
+				return;
+			}
+			catch (Throwable t) {
+				// collect the stack trace
+				StringWriter sw = new StringWriter();
+				PrintWriter w = new PrintWriter(sw);
+				t.printStackTrace(w);
+
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+
+				showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
+					+ message + "</pre>");
+				return;
+			}
+
+			// redirect according to our options
+			if (show) {
+				// we have a request to show the plan
+
+				// create a UID for the job
+				Long uid;
+				do {
+					uid = Math.abs(this.rand.nextLong());
+				} while (this.submittedJobs.containsKey(uid));
+
+				// dump the job to a JSON file
+				String planName = uid + ".json";
+				File jsonFile = new File(this.planDumpDirectory, planName);
+
+				if (optPlan instanceof StreamingPlan) {
+					((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
+				}
+				else {
+					PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+					jsonGen.setEncodeForHTML(true);
+					jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
+				}
+
+				// submit the job only, if it should not be suspended
+				if (!suspend) {
+					if (optPlan instanceof OptimizedPlan) {
+						try {
+							client.run(program, (OptimizedPlan) optPlan, false);
+						}
+						catch (Throwable t) {
+							LOG.error("Error submitting job to the job-manager.", t);
+							showErrorPage(resp, t.getMessage());
+							return;
+						}
+						finally {
+							program.deleteExtractedLibraries();
+						}
+					}
+					else {
+						throw new RuntimeException("Not implemented for Streaming Job plans");
+					}
+				}
+				else {
+					try {
+						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
+					}
+					catch (ProgramInvocationException piex) {
+						LOG.error("Error creating JobGraph from optimized plan.", piex);
+						showErrorPage(resp, piex.getMessage());
+						return;
+					}
+					catch (Throwable t) {
+						LOG.error("Error creating JobGraph from optimized plan.", t);
+						showErrorPage(resp, t.getMessage());
+						return;
+					}
+				}
+
+				// redirect to the plan display page
+				resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
+			}
+			else {
+				// don't show any plan. directly submit the job and redirect to the
+				// runtime monitor
+				try {
+					client.run(program, parallelism, false);
+				}
+				catch (Exception ex) {
+					LOG.error("Error submitting job to the job-manager.", ex);
+					// HACK: Is necessary because Message contains whole stack trace
+					String errorMessage = ex.getMessage().split("\n")[0];
+					showErrorPage(resp, errorMessage);
+					return;
+				}
+				finally {
+					program.deleteExtractedLibraries();
+				}
+				resp.sendRedirect(START_PAGE_URL);
+			}
+		}
+		else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
+			// --------------- run a job that has been submitted earlier, but was -------------------
+			// --------------- not executed because of a plan display -------------------
+
+			String id = req.getParameter("id");
+			if (checkParameterSet(resp, id, "id")) {
+				return;
+			}
+
+			Long uid = null;
+			try {
+				uid = Long.parseLong(id);
+			} catch (NumberFormatException nfex) {
+				showErrorPage(resp, "An invalid id for the job was provided.");
+				return;
+			}
+
+			// get the retained job
+			JobGraph job = submittedJobs.remove(uid);
+			if (job == null) {
+				resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
+					"No job with the given uid was retained for later submission.");
+				return;
+			}
+
+			// submit the job
+			try {
+				Client client = new Client(config, getClass().getClassLoader());
+				client.run(job, false);
+			}
+			catch (Exception ex) {
+				LOG.error("Error submitting job to the job-manager.", ex);
+				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+				// HACK: Is necessary because Message contains whole stack trace
+				String errorMessage = ex.getMessage().split("\n")[0];
+				resp.getWriter().print(errorMessage);
+				// resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
+				return;
+			}
+
+			// redirect to the start page
+			resp.sendRedirect(START_PAGE_URL);
+		} else if (action.equals(ACTION_BACK_VALUE)) {
+			// remove the job from the map
+
+			String id = req.getParameter("id");
+			if (checkParameterSet(resp, id, "id")) {
+				return;
+			}
+
+			Long uid = null;
+			try {
+				uid = Long.parseLong(id);
+			} catch (NumberFormatException nfex) {
+				showErrorPage(resp, "An invalid id for the job was provided.");
+				return;
+			}
+
+			// remove the retained job
+			submittedJobs.remove(uid);
+
+			// redirect to the start page
+			resp.sendRedirect(START_PAGE_URL);
+		} else {
+			showErrorPage(resp, "Invalid action specified.");
+			return;
+		}
+	}
+
+	/**
+	 * Prints the error page, containing the given message.
+	 * 
+	 * @param resp
+	 *        The response handler.
+	 * @param message
+	 *        The message to display.
+	 * @throws IOException
+	 *         Thrown, if the error page could not be printed due to an I/O problem.
+	 */
+	private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
+		resp.setStatus(HttpServletResponse.SC_OK);
+		resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
+
+		PrintWriter writer = resp.getWriter();
+
+		writer
+			.println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n        \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
+		writer.println("<html>");
+		writer.println("<head>");
+		writer.println("  <title>Launch Job - Error</title>");
+		writer.println("  <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
+		writer.println("  <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
+		writer.println("</head>");
+
+		writer.println("<body>");
+		writer.println("  <div class=\"mainHeading\">");
+		writer.println("    <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
+		writer.println("  </div>");
+		writer.println("  <div style=\"margin-top: 50px; text-align: center;\">");
+		writer.println("    <p class=\"error_text\" style=\"font-size: 18px;\">");
+		writer.println(message);
+		writer.println("    </p><br/><br/>");
+		writer.println("    <form action=\"launch.html\" method=\"GET\">");
+		writer.println("      <input type=\"submit\" value=\"back\">");
+		writer.println("    </form>");
+		writer.println("  </div>");
+		writer.println("</body>");
+		writer.println("</html>");
+	}
+
+	/**
+	 * Checks the given parameter. If it is null, it prints the error page.
+	 * 
+	 * @param resp
+	 *        The response handler.
+	 * @param parameter
+	 *        The parameter to check.
+	 * @param parameterName
+	 *        The name of the parameter, to describe it in the error message.
+	 * @return True, if the parameter is null, false otherwise.
+	 * @throws IOException
+	 *         Thrown, if the error page could not be printed.
+	 */
+	private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
+			throws IOException {
+		if (parameter == null) {
+			showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+	 * takes care of quotes, such that quoted passages end up being one string.
+	 * 
+	 * @param args
+	 *        The string to be split.
+	 * @return The array of split strings.
+	 */
+	private static final List<String> tokenizeArguments(String args) {
+		List<String> list = new ArrayList<String>();
+		StringBuilder curr = new StringBuilder();
+
+		int pos = 0;
+		boolean quoted = false;
+
+		while (pos < args.length()) {
+			char c = args.charAt(pos);
+			if ((c == ' ' || c == '\t') && !quoted) {
+				if (curr.length() > 0) {
+					list.add(curr.toString());
+					curr.setLength(0);
+				}
+			} else if (c == '"') {
+				quoted = !quoted;
+			} else {
+				curr.append(c);
+			}
+
+			pos++;
+		}
+
+		if (quoted) {
+			throw new IllegalArgumentException("Unterminated quoted string.");
+		}
+
+		if (curr.length() > 0) {
+			list.add(curr.toString());
+		}
+
+		return list;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index b3c9cc4..cee13dd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -1,212 +1,230 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.fileupload.FileItem;
-import org.apache.commons.fileupload.FileUploadException;
-import org.apache.commons.fileupload.disk.DiskFileItemFactory;
-import org.apache.commons.fileupload.servlet.ServletFileUpload;
-
-/**
- * A servlet that accepts uploads of pact programs, returns a listing of the
- * directory containing the job jars, and deleting these jars.
- */
-public class JobsServlet extends HttpServlet {
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = -1373210261957434273L;
-
-	// ------------------------------------------------------------------------
-
-	private static final String ACTION_PARAM_NAME = "action";
-
-	private static final String ACTION_LIST_VALUE = "list";
-
-	private static final String ACTION_DELETE_VALUE = "delete";
-
-	private static final String FILENAME_PARAM_NAME = "filename";
-
-	private static final String CONTENT_TYPE_PLAIN = "text/plain";
-
-	private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
-		@Override
-		public int compare(File o1, File o2) {
-			return o1.getName().compareTo(o2.getName());
-		}
-
-	};
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The file object for the directory for temporary files.
-	 */
-	private final File tmpDir;
-
-	/**
-	 * The file object for the directory for the submitted jars.
-	 */
-	private final File destinationDir;
-
-	/**
-	 * The name of the file to redirect to after the upload.
-	 */
-	private final String targetPage;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new <tt>JobServlet</tt>, configured with the given directories
-	 * and the given result page.
-	 * 
-	 * @param jobsDir
-	 *        The directory to store uploaded jobs in.
-	 * @param tmpDir
-	 *        The directory for temporary files.
-	 * @param targetPage
-	 *        The page to redirect to after a successful upload.
-	 */
-	public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
-		this.tmpDir = tmpDir;
-		this.destinationDir = jobsDir;
-		this.targetPage = targetPage;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public void init(ServletConfig config) throws ServletException {
-		super.init(config);
-
-		if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
-			throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
-		}
-
-		if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
-			throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
-		}
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		String action = req.getParameter(ACTION_PARAM_NAME);
-
-		if (action.equals(ACTION_LIST_VALUE)) {
-			GregorianCalendar cal = new GregorianCalendar();
-
-			File[] files = destinationDir.listFiles();
-			Arrays.<File> sort(files, FILE_SORTER);
-
-			resp.setStatus(HttpServletResponse.SC_OK);
-			resp.setContentType(CONTENT_TYPE_PLAIN);
-
-			PrintWriter writer = resp.getWriter();
-			for (int i = 0; i < files.length; i++) {
-				if (!files[i].getName().endsWith(".jar")) {
-					continue;
-				}
-
-				cal.setTimeInMillis(files[i].lastModified());
-				writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
-					+ cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
-					+ cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
-					+ cal.get(GregorianCalendar.SECOND));
-			}
-		} else if (action.equals(ACTION_DELETE_VALUE)) {
-			String filename = req.getParameter(FILENAME_PARAM_NAME);
-
-			if (filename == null || filename.length() == 0) {
-				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			} else {
-				File f = new File(destinationDir, filename);
-				if (!f.exists() || f.isDirectory()) {
-					resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
-				}
-				f.delete();
-				resp.setStatus(HttpServletResponse.SC_OK);
-			}
-		} else {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-		}
-	}
-
-	@Override
-	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		// check, if we are doing the right request
-		if (!ServletFileUpload.isMultipartContent(req)) {
-			resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
-			return;
-		}
-
-		// create the disk file factory, limiting the file size to 20 MB
-		DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
-		fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
-		fileItemFactory.setRepository(tmpDir);
-
-		String filename = null;
-
-		// parse the request
-		ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
-		try {
-			Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
-
-			// go over the form fields and look for our file
-			while (itr.hasNext()) {
-				FileItem item = itr.next();
-				if (!item.isFormField()) {
-					if (item.getFieldName().equals("upload_jar_file")) {
-
-						// found the file, store it to the specified location
-						filename = item.getName();
-						File file = new File(destinationDir, filename);
-						item.write(file);
-						break;
-					}
-				}
-			}
-		} catch (FileUploadException ex) {
-			resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
-			return;
-		} catch (Exception ex) {
-			resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
-				"An unknown error occurred during the file upload.");
-			return;
-		}
-
-		// write the okay message
-		resp.sendRedirect(targetPage);
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.fileupload.FileItem;
+import org.apache.commons.fileupload.FileUploadException;
+import org.apache.commons.fileupload.disk.DiskFileItemFactory;
+import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.flink.client.program.PackagedProgram;
+
+/**
+ * A servlet that accepts uploads of pact programs, returns a listing of the
+ * directory containing the job jars, and deleting these jars.
+ */
+public class JobsServlet extends HttpServlet {
+	/**
+	 * Serial UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = -1373210261957434273L;
+
+	// ------------------------------------------------------------------------
+
+	private static final String ACTION_PARAM_NAME = "action";
+
+	private static final String ACTION_LIST_VALUE = "list";
+
+	private static final String ACTION_DELETE_VALUE = "delete";
+
+	private static final String FILENAME_PARAM_NAME = "filename";
+
+	private static final String CONTENT_TYPE_PLAIN = "text/plain";
+
+	private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
+		@Override
+		public int compare(File o1, File o2) {
+			return o1.getName().compareTo(o2.getName());
+		}
+
+	};
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The file object for the directory for temporary files.
+	 */
+	private final File tmpDir;
+
+	/**
+	 * The file object for the directory for the submitted jars.
+	 */
+	private final File destinationDir;
+
+	/**
+	 * The name of the file to redirect to after the upload.
+	 */
+	private final String targetPage;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new <tt>JobServlet</tt>, configured with the given directories
+	 * and the given result page.
+	 * 
+	 * @param jobsDir
+	 *        The directory to store uploaded jobs in.
+	 * @param tmpDir
+	 *        The directory for temporary files.
+	 * @param targetPage
+	 *        The page to redirect to after a successful upload.
+	 */
+	public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
+		this.tmpDir = tmpDir;
+		this.destinationDir = jobsDir;
+		this.targetPage = targetPage;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public void init(ServletConfig config) throws ServletException {
+		super.init(config);
+
+		if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
+			throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
+		}
+
+		if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
+			throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
+		}
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		String action = req.getParameter(ACTION_PARAM_NAME);
+
+		if (action.equals(ACTION_LIST_VALUE)) {
+			GregorianCalendar cal = new GregorianCalendar();
+
+			File[] files = destinationDir.listFiles();
+			Arrays.<File> sort(files, FILE_SORTER);
+
+			resp.setStatus(HttpServletResponse.SC_OK);
+			resp.setContentType(CONTENT_TYPE_PLAIN);
+
+			PrintWriter writer = resp.getWriter();
+			for (int i = 0; i < files.length; i++) {
+				if (!files[i].getName().endsWith(".jar")) {
+					continue;
+				}
+
+				JarFile jar = new JarFile(files[i]);
+				Manifest manifest = jar.getManifest();
+				String assemblerClass = null;
+				if (manifest != null) {
+					assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+					if (assemblerClass == null) {
+						assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+					}
+				}
+				if (assemblerClass == null) {
+					assemblerClass = "";
+				} else {
+					assemblerClass = '\t' + assemblerClass;
+				}
+
+				cal.setTimeInMillis(files[i].lastModified());
+				writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
+					+ cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
+					+ cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
+					+ cal.get(GregorianCalendar.SECOND) + assemblerClass);
+			}
+		} else if (action.equals(ACTION_DELETE_VALUE)) {
+			String filename = req.getParameter(FILENAME_PARAM_NAME);
+
+			if (filename == null || filename.length() == 0) {
+				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			} else {
+				File f = new File(destinationDir, filename);
+				if (!f.exists() || f.isDirectory()) {
+					resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
+				}
+				f.delete();
+				resp.setStatus(HttpServletResponse.SC_OK);
+			}
+		} else {
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+		}
+	}
+
+	@Override
+	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		// check, if we are doing the right request
+		if (!ServletFileUpload.isMultipartContent(req)) {
+			resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
+			return;
+		}
+
+		// create the disk file factory, limiting the file size to 20 MB
+		DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
+		fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
+		fileItemFactory.setRepository(tmpDir);
+
+		String filename = null;
+
+		// parse the request
+		ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
+		try {
+			Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
+
+			// go over the form fields and look for our file
+			while (itr.hasNext()) {
+				FileItem item = itr.next();
+				if (!item.isFormField()) {
+					if (item.getFieldName().equals("upload_jar_file")) {
+
+						// found the file, store it to the specified location
+						filename = item.getName();
+						File file = new File(destinationDir, filename);
+						item.write(file);
+						break;
+					}
+				}
+			}
+		} catch (FileUploadException ex) {
+			resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
+			return;
+		} catch (Exception ex) {
+			resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+				"An unknown error occurred during the file upload.");
+			return;
+		}
+
+		// write the okay message
+		resp.sendRedirect(targetPage);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
index cdef605..019fc50 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
@@ -44,6 +44,8 @@ public class PactJobJSONServlet extends HttpServlet {
 
 	private static final String JOB_PARAM_NAME = "job";
 
+	private static final String CLASS_PARAM_NAME = "assemblerClass";
+
 	// ------------------------------------------------------------------------
 
 	private final File jobStoreDirectory; // the directory in which the jobs are stored
@@ -74,7 +76,7 @@ public class PactJobJSONServlet extends HttpServlet {
 		// create the pact plan
 		PackagedProgram pactProgram;
 		try {
-			pactProgram = new PackagedProgram(jarFile, new String[0]);
+			pactProgram = new PackagedProgram(jarFile, req.getParameter(CLASS_PARAM_NAME), new String[0]);
 		}
 		catch (Throwable t) {
 			LOG.info("Instantiating the PactProgram for '" + jarFile.getName() + "' failed.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/resources/web-docs/js/program.js
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/resources/web-docs/js/program.js b/flink-clients/src/main/resources/web-docs/js/program.js
index c443dab..619deb8 100644
--- a/flink-clients/src/main/resources/web-docs/js/program.js
+++ b/flink-clients/src/main/resources/web-docs/js/program.js
@@ -1,21 +1,21 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 var maxColumnWidth = 200;
 var minColumnWidth = 100;
 
@@ -77,6 +77,7 @@ function toggleCheckboxes(box)
     $('.jobItemCheckbox').attr('checked', false);
     box.attr('checked', true);
     var id = box.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+    var assemblerClass = box.attr('id');
 
     $('#mainCanvas').html('');
     $('#planDescription').html('');
@@ -85,7 +86,7 @@ function toggleCheckboxes(box)
     $.ajax({
         type: "GET",
         url: "pactPlan",
-        data: { job: id },
+        data: { job: id, assemblerClass: assemblerClass},
         success: function(response) { showPreviewPlan(response); }
     });
   }
@@ -154,34 +155,35 @@ function createJobList(data)
   {
     if (lines[i] == null || lines[i].length == 0) {
       continue;
-
     }
     
-    var name = null;
-    var date = null;
-    var tabIx = lines[i].indexOf("\t");
-
-    if (tabIx > 0) {
-      name = lines[i].substr(0, tabIx);
-      if (tabIx < lines[i].length - 1) {
-        date = lines[i].substr(tabIx + 1);
-      }
-      else {
-        date = "unknown date";
+    var name = lines[i];
+    var date = "unknown date";
+    var assemblerClass = "<em>no entry class specified</em>";
+    
+    var tokens = lines[i].split("\t");
+    if (tokens.length > 0) {
+      name = tokens[0];
+      if (tokens.length > 1) {
+        date = tokens[1];
+        if (tokens.length > 2) {
+          assemblerClass = tokens[2];
+        }
       }
     }
-    else {
-      name = lines[i];
-      date = "unknown date";
-    }
     
+    var classes = assemblerClass.split(",");
     
     markup += '<div id="job_' + name + '" class="JobListItems"><table class="table"><tr>';
-    markup += '<td width="30px;"><input class="jobItemCheckbox" type="checkbox"></td>';
-    markup += '<td><p class="JobListItemsName">' + name + '</p></td>';
+    markup += '<td colspan="2"><p class="JobListItemsName">' + name + '</p></td>';
     markup += '<td><p class="JobListItemsDate">' + date + '</p></td>';
-    markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td>';
-    markup += '</tr></table></div>';
+    markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td></tr>';
+    
+    for (var idx in classes) {
+      markup += '<tr><td width="30px;"><input id="' + classes[idx] + '" class="jobItemCheckbox" type="checkbox"></td>';
+      markup += '<td colspan="3"><p class="JobListItemsDate">' + classes[idx] + '</p></td></tr>';
+    }
+    markup += '</table></div>';
   }
   
   // add the contents
@@ -207,11 +209,18 @@ function runJob ()
    }
    
    var jobName = job.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+   var assemblerClass = job.attr('id');
+   
    var showPlan = $('#showPlanCheck').is(':checked');
    var suspendPlan = $('#suspendJobDuringPlanCheck').is(':checked');
    var args = $('#commandLineArgsField').attr('value'); //TODO? Replace with .val() ?
    
-   var url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   var url;
+   if (assemblerClass == "<em>no entry class specified</em>") {
+      url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   } else {
+      url = "runJob?" + $.param({ action: "submit", job: jobName, assemblerClass: assemblerClass, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+   }
    
    window.location = url;
 }