You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/07/01 12:56:57 UTC
[2/2] flink git commit: [FLINK-2175] added UI support for multiple
"assembler class" entries in jar files - program-class can contain multiple
comma seperated entry classes - Main-Class can contain multiple comma
seperated entry classes - WebClient
[FLINK-2175] added UI support for multiple "assembler class" entries in jar files
- program-class can contain multiple comma seperated entry classes
- Main-Class can contain multiple comma seperated entry classes
- WebClient shows checkbox for each entry class and can display plan for it
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e75c7b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e75c7b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e75c7b3
Branch: refs/heads/master
Commit: 7e75c7b3cc89d8b81341dea6911c39cbfaa6138d
Parents: abaf4af
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Sun May 17 22:25:17 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jul 1 12:56:41 2015 +0200
----------------------------------------------------------------------
.../flink/client/web/JobSubmissionServlet.java | 987 ++++++++++---------
.../apache/flink/client/web/JobsServlet.java | 442 +++++----
.../flink/client/web/PactJobJSONServlet.java | 4 +-
.../src/main/resources/web-docs/js/program.js | 89 +-
4 files changed, 777 insertions(+), 745 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 205d0b2..5ec698b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -1,492 +1,495 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class JobSubmissionServlet extends HttpServlet {
-
- /**
- * Serial UID for serialization interoperability.
- */
- private static final long serialVersionUID = 8447312301029847397L;
-
- // ------------------------------------------------------------------------
-
- public static final String START_PAGE_URL = "launch.html";
-
- private static final String ACTION_PARAM_NAME = "action";
-
- private static final String ACTION_SUBMIT_VALUE = "submit";
-
- private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
-
- private static final String ACTION_BACK_VALUE = "back";
-
- private static final String JOB_PARAM_NAME = "job";
-
- private static final String ARGUMENTS_PARAM_NAME = "arguments";
-
- private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
-
- private static final String SUSPEND_PARAM_NAME = "suspend";
-
- private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
-
- // ------------------------------------------------------------------------
-
- private final File jobStoreDirectory; // the directory containing the uploaded jobs
-
- private final File planDumpDirectory; // the directory to dump the optimizer plans to
-
- private final Map<Long, JobGraph> submittedJobs; // map from UIDs to the running jobs
-
- private final Random rand; // random number generator for UID
-
- private final Configuration config;
-
-
- public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
- this.config = config;
- this.jobStoreDirectory = jobDir;
- this.planDumpDirectory = planDir;
-
- this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
-
- this.rand = new Random(System.currentTimeMillis());
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- String action = req.getParameter(ACTION_PARAM_NAME);
- if (checkParameterSet(resp, action, "action")) {
- return;
- }
-
- // decide according to the action
- if (action.equals(ACTION_SUBMIT_VALUE)) {
- // --------------- submit a job -------------------
-
- // get the parameters
- String jobName = req.getParameter(JOB_PARAM_NAME);
- String args = req.getParameter(ARGUMENTS_PARAM_NAME);
- String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
- String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
-
- // check that all parameters are set
- if (checkParameterSet(resp, jobName, JOB_PARAM_NAME) || checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
- || checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
- || checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
- {
- showErrorPage(resp, "Invalid request, missing parameters.");
- return;
- }
-
- boolean show = Boolean.parseBoolean(showPlan);
- boolean suspend = Boolean.parseBoolean(suspendPlan);
-
- // check, if the jar exists
- File jarFile = new File(jobStoreDirectory, jobName);
- if (!jarFile.exists()) {
- showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
- return;
- }
-
- // parse the arguments
- List<String> params;
- try {
- params = tokenizeArguments(args);
- } catch (IllegalArgumentException iaex) {
- showErrorPage(resp, "The arguments contain an unterminated quoted string.");
- return;
- }
-
- String assemblerClass = null;
- int parallelism = -1;
- while(params.size() >= 2) {
- if (params.get(0).equals("-c")) {
- assemblerClass = params.get(1);
- params.remove(0);
- params.remove(0);
- }
- else if (params.get(0).equals("-p")) {
- parallelism = Integer.parseInt(params.get(1));
- params.remove(0);
- params.remove(0);
- }
- else {
- break;
- }
- }
-
- // create the plan
- String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
- PackagedProgram program;
- FlinkPlan optPlan;
- Client client;
-
- try {
- if (assemblerClass == null) {
- program = new PackagedProgram(jarFile, options);
- } else {
- program = new PackagedProgram(jarFile, assemblerClass, options);
- }
-
- client = new Client(config, program.getUserCodeClassLoader());
-
- optPlan = client.getOptimizedPlan(program, parallelism);
-
- if (optPlan == null) {
- throw new Exception("The optimized plan could not be produced.");
- }
- }
- catch (ProgramInvocationException e) {
- // collect the stack trace
- StringWriter sw = new StringWriter();
- PrintWriter w = new PrintWriter(sw);
-
- if (e.getCause() == null) {
- e.printStackTrace(w);
- } else {
- e.getCause().printStackTrace(w);
- }
-
- String message = sw.toString();
- message = StringEscapeUtils.escapeHtml4(message);
-
- showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
- + e.getMessage() + "<br/>"
- + "<br/><br/><pre>" + message + "</pre>");
- return;
- }
- catch (CompilerException cex) {
- // collect the stack trace
- StringWriter sw = new StringWriter();
- PrintWriter w = new PrintWriter(sw);
- cex.printStackTrace(w);
-
- String message = sw.toString();
- message = StringEscapeUtils.escapeHtml4(message);
-
- showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
- + cex.getMessage() + "<br/>"
- + (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
- + "<br/><br/><pre>" + message + "</pre>");
- return;
- }
- catch (Throwable t) {
- // collect the stack trace
- StringWriter sw = new StringWriter();
- PrintWriter w = new PrintWriter(sw);
- t.printStackTrace(w);
-
- String message = sw.toString();
- message = StringEscapeUtils.escapeHtml4(message);
-
- showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
- + message + "</pre>");
- return;
- }
-
- // redirect according to our options
- if (show) {
- // we have a request to show the plan
-
- // create a UID for the job
- Long uid;
- do {
- uid = Math.abs(this.rand.nextLong());
- } while (this.submittedJobs.containsKey(uid));
-
- // dump the job to a JSON file
- String planName = uid + ".json";
- File jsonFile = new File(this.planDumpDirectory, planName);
-
- if (optPlan instanceof StreamingPlan) {
- ((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
- }
- else {
- PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
- jsonGen.setEncodeForHTML(true);
- jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
- }
-
- // submit the job only, if it should not be suspended
- if (!suspend) {
- if (optPlan instanceof OptimizedPlan) {
- try {
- client.run(program, (OptimizedPlan) optPlan, false);
- }
- catch (Throwable t) {
- LOG.error("Error submitting job to the job-manager.", t);
- showErrorPage(resp, t.getMessage());
- return;
- }
- finally {
- program.deleteExtractedLibraries();
- }
- }
- else {
- throw new RuntimeException("Not implemented for Streaming Job plans");
- }
- }
- else {
- try {
- this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
- }
- catch (ProgramInvocationException piex) {
- LOG.error("Error creating JobGraph from optimized plan.", piex);
- showErrorPage(resp, piex.getMessage());
- return;
- }
- catch (Throwable t) {
- LOG.error("Error creating JobGraph from optimized plan.", t);
- showErrorPage(resp, t.getMessage());
- return;
- }
- }
-
- // redirect to the plan display page
- resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
- }
- else {
- // don't show any plan. directly submit the job and redirect to the
- // runtime monitor
- try {
- client.run(program, parallelism, false);
- }
- catch (Exception ex) {
- LOG.error("Error submitting job to the job-manager.", ex);
- // HACK: Is necessary because Message contains whole stack trace
- String errorMessage = ex.getMessage().split("\n")[0];
- showErrorPage(resp, errorMessage);
- return;
- }
- finally {
- program.deleteExtractedLibraries();
- }
- resp.sendRedirect(START_PAGE_URL);
- }
- }
- else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
- // --------------- run a job that has been submitted earlier, but was -------------------
- // --------------- not executed because of a plan display -------------------
-
- String id = req.getParameter("id");
- if (checkParameterSet(resp, id, "id")) {
- return;
- }
-
- Long uid = null;
- try {
- uid = Long.parseLong(id);
- } catch (NumberFormatException nfex) {
- showErrorPage(resp, "An invalid id for the job was provided.");
- return;
- }
-
- // get the retained job
- JobGraph job = submittedJobs.remove(uid);
- if (job == null) {
- resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
- "No job with the given uid was retained for later submission.");
- return;
- }
-
- // submit the job
- try {
- Client client = new Client(config, getClass().getClassLoader());
- client.run(job, false);
- }
- catch (Exception ex) {
- LOG.error("Error submitting job to the job-manager.", ex);
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- // HACK: Is necessary because Message contains whole stack trace
- String errorMessage = ex.getMessage().split("\n")[0];
- resp.getWriter().print(errorMessage);
- // resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
- return;
- }
-
- // redirect to the start page
- resp.sendRedirect(START_PAGE_URL);
- } else if (action.equals(ACTION_BACK_VALUE)) {
- // remove the job from the map
-
- String id = req.getParameter("id");
- if (checkParameterSet(resp, id, "id")) {
- return;
- }
-
- Long uid = null;
- try {
- uid = Long.parseLong(id);
- } catch (NumberFormatException nfex) {
- showErrorPage(resp, "An invalid id for the job was provided.");
- return;
- }
-
- // remove the retained job
- submittedJobs.remove(uid);
-
- // redirect to the start page
- resp.sendRedirect(START_PAGE_URL);
- } else {
- showErrorPage(resp, "Invalid action specified.");
- return;
- }
- }
-
- /**
- * Prints the error page, containing the given message.
- *
- * @param resp
- * The response handler.
- * @param message
- * The message to display.
- * @throws IOException
- * Thrown, if the error page could not be printed due to an I/O problem.
- */
- private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
-
- PrintWriter writer = resp.getWriter();
-
- writer
- .println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
- writer.println("<html>");
- writer.println("<head>");
- writer.println(" <title>Launch Job - Error</title>");
- writer.println(" <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
- writer.println(" <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
- writer.println("</head>");
-
- writer.println("<body>");
- writer.println(" <div class=\"mainHeading\">");
- writer.println(" <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
- writer.println(" </div>");
- writer.println(" <div style=\"margin-top: 50px; text-align: center;\">");
- writer.println(" <p class=\"error_text\" style=\"font-size: 18px;\">");
- writer.println(message);
- writer.println(" </p><br/><br/>");
- writer.println(" <form action=\"launch.html\" method=\"GET\">");
- writer.println(" <input type=\"submit\" value=\"back\">");
- writer.println(" </form>");
- writer.println(" </div>");
- writer.println("</body>");
- writer.println("</html>");
- }
-
- /**
- * Checks the given parameter. If it is null, it prints the error page.
- *
- * @param resp
- * The response handler.
- * @param parameter
- * The parameter to check.
- * @param parameterName
- * The name of the parameter, to describe it in the error message.
- * @return True, if the parameter is null, false otherwise.
- * @throws IOException
- * Thrown, if the error page could not be printed.
- */
- private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
- throws IOException {
- if (parameter == null) {
- showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
- * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
- * takes care of quotes, such that quoted passages end up being one string.
- *
- * @param args
- * The string to be split.
- * @return The array of split strings.
- */
- private static final List<String> tokenizeArguments(String args) {
- List<String> list = new ArrayList<String>();
- StringBuilder curr = new StringBuilder();
-
- int pos = 0;
- boolean quoted = false;
-
- while (pos < args.length()) {
- char c = args.charAt(pos);
- if ((c == ' ' || c == '\t') && !quoted) {
- if (curr.length() > 0) {
- list.add(curr.toString());
- curr.setLength(0);
- }
- } else if (c == '"') {
- quoted = !quoted;
- } else {
- curr.append(c);
- }
-
- pos++;
- }
-
- if (quoted) {
- throw new IllegalArgumentException("Unterminated quoted string.");
- }
-
- if (curr.length() > 0) {
- list.add(curr.toString());
- }
-
- return list;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSubmissionServlet extends HttpServlet {
+
+ /**
+ * Serial UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = 8447312301029847397L;
+
+ // ------------------------------------------------------------------------
+
+ public static final String START_PAGE_URL = "launch.html";
+
+ private static final String ACTION_PARAM_NAME = "action";
+
+ private static final String ACTION_SUBMIT_VALUE = "submit";
+
+ private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
+
+ private static final String ACTION_BACK_VALUE = "back";
+
+ private static final String JOB_PARAM_NAME = "job";
+
+ private static final String CLASS_PARAM_NAME = "assemblerClass";
+
+ private static final String ARGUMENTS_PARAM_NAME = "arguments";
+
+ private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
+
+ private static final String SUSPEND_PARAM_NAME = "suspend";
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
+
+ // ------------------------------------------------------------------------
+
+ private final File jobStoreDirectory; // the directory containing the uploaded jobs
+
+ private final File planDumpDirectory; // the directory to dump the optimizer plans to
+
+ private final Map<Long, JobGraph> submittedJobs; // map from UIDs to the running jobs
+
+ private final Random rand; // random number generator for UID
+
+ private final Configuration config;
+
+
+ public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
+ this.config = config;
+ this.jobStoreDirectory = jobDir;
+ this.planDumpDirectory = planDir;
+
+ this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
+
+ this.rand = new Random(System.currentTimeMillis());
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String action = req.getParameter(ACTION_PARAM_NAME);
+ if (checkParameterSet(resp, action, "action")) {
+ return;
+ }
+
+ // decide according to the action
+ if (action.equals(ACTION_SUBMIT_VALUE)) {
+ // --------------- submit a job -------------------
+
+ // get the parameters
+ String jobName = req.getParameter(JOB_PARAM_NAME);
+ String assemblerClass = req.getParameter(CLASS_PARAM_NAME);
+ String args = req.getParameter(ARGUMENTS_PARAM_NAME);
+ String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
+ String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
+
+ // check that all parameters are set
+ // do NOT check 'assemblerClass' -> it is OK if it is not set
+ if (checkParameterSet(resp, jobName, JOB_PARAM_NAME)
+ || checkParameterSet(resp, args, ARGUMENTS_PARAM_NAME)
+ || checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME)
+ || checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME))
+ {
+ return;
+ }
+
+ boolean show = Boolean.parseBoolean(showPlan);
+ boolean suspend = Boolean.parseBoolean(suspendPlan);
+
+ // check, if the jar exists
+ File jarFile = new File(jobStoreDirectory, jobName);
+ if (!jarFile.exists()) {
+ showErrorPage(resp, "The jar file + '" + jarFile.getPath() + "' does not exist.");
+ return;
+ }
+
+ // parse the arguments
+ List<String> params;
+ try {
+ params = tokenizeArguments(args);
+ } catch (IllegalArgumentException iaex) {
+ showErrorPage(resp, "The arguments contain an unterminated quoted string.");
+ return;
+ }
+
+ int parallelism = -1;
+ while(params.size() >= 2) {
+ if (params.get(0).equals("-c")) {
+ assemblerClass = params.get(1);
+ params.remove(0);
+ params.remove(0);
+ }
+ else if (params.get(0).equals("-p")) {
+ parallelism = Integer.parseInt(params.get(1));
+ params.remove(0);
+ params.remove(0);
+ }
+ else {
+ break;
+ }
+ }
+
+ // create the plan
+ String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
+ PackagedProgram program;
+ FlinkPlan optPlan;
+ Client client;
+
+ try {
+ if (assemblerClass == null) {
+ program = new PackagedProgram(jarFile, options);
+ } else {
+ program = new PackagedProgram(jarFile, assemblerClass, options);
+ }
+
+ client = new Client(config, program.getUserCodeClassLoader());
+
+ optPlan = client.getOptimizedPlan(program, parallelism);
+
+ if (optPlan == null) {
+ throw new Exception("The optimized plan could not be produced.");
+ }
+ }
+ catch (ProgramInvocationException e) {
+ // collect the stack trace
+ StringWriter sw = new StringWriter();
+ PrintWriter w = new PrintWriter(sw);
+
+ if (e.getCause() == null) {
+ e.printStackTrace(w);
+ } else {
+ e.getCause().printStackTrace(w);
+ }
+
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
+
+ showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
+ + e.getMessage() + "<br/>"
+ + "<br/><br/><pre>" + message + "</pre>");
+ return;
+ }
+ catch (CompilerException cex) {
+ // collect the stack trace
+ StringWriter sw = new StringWriter();
+ PrintWriter w = new PrintWriter(sw);
+ cex.printStackTrace(w);
+
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
+
+ showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
+ + cex.getMessage() + "<br/>"
+ + (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
+ + "<br/><br/><pre>" + message + "</pre>");
+ return;
+ }
+ catch (Throwable t) {
+ // collect the stack trace
+ StringWriter sw = new StringWriter();
+ PrintWriter w = new PrintWriter(sw);
+ t.printStackTrace(w);
+
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
+
+ showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
+ + message + "</pre>");
+ return;
+ }
+
+ // redirect according to our options
+ if (show) {
+ // we have a request to show the plan
+
+ // create a UID for the job
+ Long uid;
+ do {
+ uid = Math.abs(this.rand.nextLong());
+ } while (this.submittedJobs.containsKey(uid));
+
+ // dump the job to a JSON file
+ String planName = uid + ".json";
+ File jsonFile = new File(this.planDumpDirectory, planName);
+
+ if (optPlan instanceof StreamingPlan) {
+ ((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
+ }
+ else {
+ PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+ jsonGen.setEncodeForHTML(true);
+ jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
+ }
+
+ // submit the job only, if it should not be suspended
+ if (!suspend) {
+ if (optPlan instanceof OptimizedPlan) {
+ try {
+ client.run(program, (OptimizedPlan) optPlan, false);
+ }
+ catch (Throwable t) {
+ LOG.error("Error submitting job to the job-manager.", t);
+ showErrorPage(resp, t.getMessage());
+ return;
+ }
+ finally {
+ program.deleteExtractedLibraries();
+ }
+ }
+ else {
+ throw new RuntimeException("Not implemented for Streaming Job plans");
+ }
+ }
+ else {
+ try {
+ this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
+ }
+ catch (ProgramInvocationException piex) {
+ LOG.error("Error creating JobGraph from optimized plan.", piex);
+ showErrorPage(resp, piex.getMessage());
+ return;
+ }
+ catch (Throwable t) {
+ LOG.error("Error creating JobGraph from optimized plan.", t);
+ showErrorPage(resp, t.getMessage());
+ return;
+ }
+ }
+
+ // redirect to the plan display page
+ resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
+ }
+ else {
+ // don't show any plan. directly submit the job and redirect to the
+ // runtime monitor
+ try {
+ client.run(program, parallelism, false);
+ }
+ catch (Exception ex) {
+ LOG.error("Error submitting job to the job-manager.", ex);
+ // HACK: Is necessary because Message contains whole stack trace
+ String errorMessage = ex.getMessage().split("\n")[0];
+ showErrorPage(resp, errorMessage);
+ return;
+ }
+ finally {
+ program.deleteExtractedLibraries();
+ }
+ resp.sendRedirect(START_PAGE_URL);
+ }
+ }
+ else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
+ // --------------- run a job that has been submitted earlier, but was -------------------
+ // --------------- not executed because of a plan display -------------------
+
+ String id = req.getParameter("id");
+ if (checkParameterSet(resp, id, "id")) {
+ return;
+ }
+
+ Long uid = null;
+ try {
+ uid = Long.parseLong(id);
+ } catch (NumberFormatException nfex) {
+ showErrorPage(resp, "An invalid id for the job was provided.");
+ return;
+ }
+
+ // get the retained job
+ JobGraph job = submittedJobs.remove(uid);
+ if (job == null) {
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ "No job with the given uid was retained for later submission.");
+ return;
+ }
+
+ // submit the job
+ try {
+ Client client = new Client(config, getClass().getClassLoader());
+ client.run(job, false);
+ }
+ catch (Exception ex) {
+ LOG.error("Error submitting job to the job-manager.", ex);
+ resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ // HACK: Is necessary because Message contains whole stack trace
+ String errorMessage = ex.getMessage().split("\n")[0];
+ resp.getWriter().print(errorMessage);
+ // resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
+ return;
+ }
+
+ // redirect to the start page
+ resp.sendRedirect(START_PAGE_URL);
+ } else if (action.equals(ACTION_BACK_VALUE)) {
+ // remove the job from the map
+
+ String id = req.getParameter("id");
+ if (checkParameterSet(resp, id, "id")) {
+ return;
+ }
+
+ Long uid = null;
+ try {
+ uid = Long.parseLong(id);
+ } catch (NumberFormatException nfex) {
+ showErrorPage(resp, "An invalid id for the job was provided.");
+ return;
+ }
+
+ // remove the retained job
+ submittedJobs.remove(uid);
+
+ // redirect to the start page
+ resp.sendRedirect(START_PAGE_URL);
+ } else {
+ showErrorPage(resp, "Invalid action specified.");
+ return;
+ }
+ }
+
+ /**
+ * Prints the error page, containing the given message.
+ *
+ * @param resp
+ * The response handler.
+ * @param message
+ * The message to display.
+ * @throws IOException
+ * Thrown, if the error page could not be printed due to an I/O problem.
+ */
+ private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType(GUIServletStub.CONTENT_TYPE_HTML);
+
+ PrintWriter writer = resp.getWriter();
+
+ writer
+ .println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
+ writer.println("<html>");
+ writer.println("<head>");
+ writer.println(" <title>Launch Job - Error</title>");
+ writer.println(" <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
+ writer.println(" <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
+ writer.println("</head>");
+
+ writer.println("<body>");
+ writer.println(" <div class=\"mainHeading\">");
+ writer.println(" <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
+ writer.println(" </div>");
+ writer.println(" <div style=\"margin-top: 50px; text-align: center;\">");
+ writer.println(" <p class=\"error_text\" style=\"font-size: 18px;\">");
+ writer.println(message);
+ writer.println(" </p><br/><br/>");
+ writer.println(" <form action=\"launch.html\" method=\"GET\">");
+ writer.println(" <input type=\"submit\" value=\"back\">");
+ writer.println(" </form>");
+ writer.println(" </div>");
+ writer.println("</body>");
+ writer.println("</html>");
+ }
+
+ /**
+ * Checks the given parameter. If it is null, it prints the error page.
+ *
+ * @param resp
+ * The response handler.
+ * @param parameter
+ * The parameter to check.
+ * @param parameterName
+ * The name of the parameter, to describe it in the error message.
+ * @return True, if the parameter is null, false otherwise.
+ * @throws IOException
+ * Thrown, if the error page could not be printed.
+ */
+ private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName)
+ throws IOException {
+ if (parameter == null) {
+ showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+ * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+ * takes care of quotes, such that quoted passages end up being one string.
+ *
+ * @param args
+ * The string to be split.
+ * @return The array of split strings.
+ */
+ private static final List<String> tokenizeArguments(String args) {
+ List<String> list = new ArrayList<String>();
+ StringBuilder curr = new StringBuilder();
+
+ int pos = 0;
+ boolean quoted = false;
+
+ while (pos < args.length()) {
+ char c = args.charAt(pos);
+ if ((c == ' ' || c == '\t') && !quoted) {
+ if (curr.length() > 0) {
+ list.add(curr.toString());
+ curr.setLength(0);
+ }
+ } else if (c == '"') {
+ quoted = !quoted;
+ } else {
+ curr.append(c);
+ }
+
+ pos++;
+ }
+
+ if (quoted) {
+ throw new IllegalArgumentException("Unterminated quoted string.");
+ }
+
+ if (curr.length() > 0) {
+ list.add(curr.toString());
+ }
+
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index b3c9cc4..cee13dd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -1,212 +1,230 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.fileupload.FileItem;
-import org.apache.commons.fileupload.FileUploadException;
-import org.apache.commons.fileupload.disk.DiskFileItemFactory;
-import org.apache.commons.fileupload.servlet.ServletFileUpload;
-
-/**
- * A servlet that accepts uploads of pact programs, returns a listing of the
- * directory containing the job jars, and deleting these jars.
- */
-public class JobsServlet extends HttpServlet {
- /**
- * Serial UID for serialization interoperability.
- */
- private static final long serialVersionUID = -1373210261957434273L;
-
- // ------------------------------------------------------------------------
-
- private static final String ACTION_PARAM_NAME = "action";
-
- private static final String ACTION_LIST_VALUE = "list";
-
- private static final String ACTION_DELETE_VALUE = "delete";
-
- private static final String FILENAME_PARAM_NAME = "filename";
-
- private static final String CONTENT_TYPE_PLAIN = "text/plain";
-
- private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
- @Override
- public int compare(File o1, File o2) {
- return o1.getName().compareTo(o2.getName());
- }
-
- };
-
- // ------------------------------------------------------------------------
-
- /**
- * The file object for the directory for temporary files.
- */
- private final File tmpDir;
-
- /**
- * The file object for the directory for the submitted jars.
- */
- private final File destinationDir;
-
- /**
- * The name of the file to redirect to after the upload.
- */
- private final String targetPage;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new <tt>JobServlet</tt>, configured with the given directories
- * and the given result page.
- *
- * @param jobsDir
- * The directory to store uploaded jobs in.
- * @param tmpDir
- * The directory for temporary files.
- * @param targetPage
- * The page to redirect to after a successful upload.
- */
- public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
- this.tmpDir = tmpDir;
- this.destinationDir = jobsDir;
- this.targetPage = targetPage;
- }
-
- // ------------------------------------------------------------------------
-
- public void init(ServletConfig config) throws ServletException {
- super.init(config);
-
- if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
- throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
- }
-
- if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
- throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
- }
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- String action = req.getParameter(ACTION_PARAM_NAME);
-
- if (action.equals(ACTION_LIST_VALUE)) {
- GregorianCalendar cal = new GregorianCalendar();
-
- File[] files = destinationDir.listFiles();
- Arrays.<File> sort(files, FILE_SORTER);
-
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType(CONTENT_TYPE_PLAIN);
-
- PrintWriter writer = resp.getWriter();
- for (int i = 0; i < files.length; i++) {
- if (!files[i].getName().endsWith(".jar")) {
- continue;
- }
-
- cal.setTimeInMillis(files[i].lastModified());
- writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
- + cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
- + cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
- + cal.get(GregorianCalendar.SECOND));
- }
- } else if (action.equals(ACTION_DELETE_VALUE)) {
- String filename = req.getParameter(FILENAME_PARAM_NAME);
-
- if (filename == null || filename.length() == 0) {
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- } else {
- File f = new File(destinationDir, filename);
- if (!f.exists() || f.isDirectory()) {
- resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
- }
- f.delete();
- resp.setStatus(HttpServletResponse.SC_OK);
- }
- } else {
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- }
- }
-
- @Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- // check, if we are doing the right request
- if (!ServletFileUpload.isMultipartContent(req)) {
- resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
- return;
- }
-
- // create the disk file factory, limiting the file size to 20 MB
- DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
- fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
- fileItemFactory.setRepository(tmpDir);
-
- String filename = null;
-
- // parse the request
- ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
- try {
- Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
-
- // go over the form fields and look for our file
- while (itr.hasNext()) {
- FileItem item = itr.next();
- if (!item.isFormField()) {
- if (item.getFieldName().equals("upload_jar_file")) {
-
- // found the file, store it to the specified location
- filename = item.getName();
- File file = new File(destinationDir, filename);
- item.write(file);
- break;
- }
- }
- }
- } catch (FileUploadException ex) {
- resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
- return;
- } catch (Exception ex) {
- resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
- "An unknown error occurred during the file upload.");
- return;
- }
-
- // write the okay message
- resp.sendRedirect(targetPage);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.fileupload.FileItem;
+import org.apache.commons.fileupload.FileUploadException;
+import org.apache.commons.fileupload.disk.DiskFileItemFactory;
+import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.flink.client.program.PackagedProgram;
+
+/**
+ * A servlet that accepts uploads of pact programs, returns a listing of the
+ * directory containing the job jars, and deleting these jars.
+ */
+public class JobsServlet extends HttpServlet {
+ /**
+ * Serial UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = -1373210261957434273L;
+
+ // ------------------------------------------------------------------------
+
+ private static final String ACTION_PARAM_NAME = "action";
+
+ private static final String ACTION_LIST_VALUE = "list";
+
+ private static final String ACTION_DELETE_VALUE = "delete";
+
+ private static final String FILENAME_PARAM_NAME = "filename";
+
+ private static final String CONTENT_TYPE_PLAIN = "text/plain";
+
+ private static final Comparator<File> FILE_SORTER = new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+
+ };
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The file object for the directory for temporary files.
+ */
+ private final File tmpDir;
+
+ /**
+ * The file object for the directory for the submitted jars.
+ */
+ private final File destinationDir;
+
+ /**
+ * The name of the file to redirect to after the upload.
+ */
+ private final String targetPage;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new <tt>JobServlet</tt>, configured with the given directories
+ * and the given result page.
+ *
+ * @param jobsDir
+ * The directory to store uploaded jobs in.
+ * @param tmpDir
+ * The directory for temporary files.
+ * @param targetPage
+ * The page to redirect to after a successful upload.
+ */
+ public JobsServlet(File jobsDir, File tmpDir, String targetPage) {
+ this.tmpDir = tmpDir;
+ this.destinationDir = jobsDir;
+ this.targetPage = targetPage;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+
+ if (!(tmpDir.isDirectory() && tmpDir.canWrite())) {
+ throw new ServletException(tmpDir.getAbsolutePath() + " is not a writable directory");
+ }
+
+ if (!(destinationDir.isDirectory() && destinationDir.canWrite())) {
+ throw new ServletException(destinationDir.getAbsolutePath() + " is not a writable directory");
+ }
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String action = req.getParameter(ACTION_PARAM_NAME);
+
+ if (action.equals(ACTION_LIST_VALUE)) {
+ GregorianCalendar cal = new GregorianCalendar();
+
+ File[] files = destinationDir.listFiles();
+ Arrays.<File> sort(files, FILE_SORTER);
+
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType(CONTENT_TYPE_PLAIN);
+
+ PrintWriter writer = resp.getWriter();
+ for (int i = 0; i < files.length; i++) {
+ if (!files[i].getName().endsWith(".jar")) {
+ continue;
+ }
+
+ JarFile jar = new JarFile(files[i]);
+ Manifest manifest = jar.getManifest();
+ String assemblerClass = null;
+ if (manifest != null) {
+ assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+ if (assemblerClass == null) {
+ assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+ }
+ }
+ if (assemblerClass == null) {
+ assemblerClass = "";
+ } else {
+ assemblerClass = '\t' + assemblerClass;
+ }
+
+ cal.setTimeInMillis(files[i].lastModified());
+ writer.println(files[i].getName() + '\t' + (cal.get(GregorianCalendar.MONTH) + 1) + '/'
+ + cal.get(GregorianCalendar.DAY_OF_MONTH) + '/' + cal.get(GregorianCalendar.YEAR) + ' '
+ + cal.get(GregorianCalendar.HOUR_OF_DAY) + ':' + cal.get(GregorianCalendar.MINUTE) + ':'
+ + cal.get(GregorianCalendar.SECOND) + assemblerClass);
+ }
+ } else if (action.equals(ACTION_DELETE_VALUE)) {
+ String filename = req.getParameter(FILENAME_PARAM_NAME);
+
+ if (filename == null || filename.length() == 0) {
+ resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ } else {
+ File f = new File(destinationDir, filename);
+ if (!f.exists() || f.isDirectory()) {
+ resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ }
+ f.delete();
+ resp.setStatus(HttpServletResponse.SC_OK);
+ }
+ } else {
+ resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ }
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ // check, if we are doing the right request
+ if (!ServletFileUpload.isMultipartContent(req)) {
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
+ return;
+ }
+
+ // create the disk file factory, limiting the file size to 20 MB
+ DiskFileItemFactory fileItemFactory = new DiskFileItemFactory();
+ fileItemFactory.setSizeThreshold(20 * 1024 * 1024); // 20 MB
+ fileItemFactory.setRepository(tmpDir);
+
+ String filename = null;
+
+ // parse the request
+ ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
+ try {
+ Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
+
+ // go over the form fields and look for our file
+ while (itr.hasNext()) {
+ FileItem item = itr.next();
+ if (!item.isFormField()) {
+ if (item.getFieldName().equals("upload_jar_file")) {
+
+ // found the file, store it to the specified location
+ filename = item.getName();
+ File file = new File(destinationDir, filename);
+ item.write(file);
+ break;
+ }
+ }
+ }
+ } catch (FileUploadException ex) {
+ resp.sendError(HttpServletResponse.SC_NOT_ACCEPTABLE, "Invalid Fileupload.");
+ return;
+ } catch (Exception ex) {
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ "An unknown error occurred during the file upload.");
+ return;
+ }
+
+ // write the okay message
+ resp.sendRedirect(targetPage);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
index cdef605..019fc50 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
@@ -44,6 +44,8 @@ public class PactJobJSONServlet extends HttpServlet {
private static final String JOB_PARAM_NAME = "job";
+ private static final String CLASS_PARAM_NAME = "assemblerClass";
+
// ------------------------------------------------------------------------
private final File jobStoreDirectory; // the directory in which the jobs are stored
@@ -74,7 +76,7 @@ public class PactJobJSONServlet extends HttpServlet {
// create the pact plan
PackagedProgram pactProgram;
try {
- pactProgram = new PackagedProgram(jarFile, new String[0]);
+ pactProgram = new PackagedProgram(jarFile, req.getParameter(CLASS_PARAM_NAME), new String[0]);
}
catch (Throwable t) {
LOG.info("Instantiating the PactProgram for '" + jarFile.getName() + "' failed.", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/7e75c7b3/flink-clients/src/main/resources/web-docs/js/program.js
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/resources/web-docs/js/program.js b/flink-clients/src/main/resources/web-docs/js/program.js
index c443dab..619deb8 100644
--- a/flink-clients/src/main/resources/web-docs/js/program.js
+++ b/flink-clients/src/main/resources/web-docs/js/program.js
@@ -1,21 +1,21 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
var maxColumnWidth = 200;
var minColumnWidth = 100;
@@ -77,6 +77,7 @@ function toggleCheckboxes(box)
$('.jobItemCheckbox').attr('checked', false);
box.attr('checked', true);
var id = box.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+ var assemblerClass = box.attr('id');
$('#mainCanvas').html('');
$('#planDescription').html('');
@@ -85,7 +86,7 @@ function toggleCheckboxes(box)
$.ajax({
type: "GET",
url: "pactPlan",
- data: { job: id },
+ data: { job: id, assemblerClass: assemblerClass},
success: function(response) { showPreviewPlan(response); }
});
}
@@ -154,34 +155,35 @@ function createJobList(data)
{
if (lines[i] == null || lines[i].length == 0) {
continue;
-
}
- var name = null;
- var date = null;
- var tabIx = lines[i].indexOf("\t");
-
- if (tabIx > 0) {
- name = lines[i].substr(0, tabIx);
- if (tabIx < lines[i].length - 1) {
- date = lines[i].substr(tabIx + 1);
- }
- else {
- date = "unknown date";
+ var name = lines[i];
+ var date = "unknown date";
+ var assemblerClass = "<em>no entry class specified</em>";
+
+ var tokens = lines[i].split("\t");
+ if (tokens.length > 0) {
+ name = tokens[0];
+ if (tokens.length > 1) {
+ date = tokens[1];
+ if (tokens.length > 2) {
+ assemblerClass = tokens[2];
+ }
}
}
- else {
- name = lines[i];
- date = "unknown date";
- }
+ var classes = assemblerClass.split(",");
markup += '<div id="job_' + name + '" class="JobListItems"><table class="table"><tr>';
- markup += '<td width="30px;"><input class="jobItemCheckbox" type="checkbox"></td>';
- markup += '<td><p class="JobListItemsName">' + name + '</p></td>';
+ markup += '<td colspan="2"><p class="JobListItemsName">' + name + '</p></td>';
markup += '<td><p class="JobListItemsDate">' + date + '</p></td>';
- markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td>';
- markup += '</tr></table></div>';
+ markup += '<td width="30px"><img class="jobItemDeleteIcon" src="img/delete-icon.png" width="24" height="24" /></td></tr>';
+
+ for (var idx in classes) {
+ markup += '<tr><td width="30px;"><input id="' + classes[idx] + '" class="jobItemCheckbox" type="checkbox"></td>';
+ markup += '<td colspan="3"><p class="JobListItemsDate">' + classes[idx] + '</p></td></tr>';
+ }
+ markup += '</table></div>';
}
// add the contents
@@ -207,11 +209,18 @@ function runJob ()
}
var jobName = job.parentsUntil('.JobListItems').parent().attr('id').substr(4);
+ var assemblerClass = job.attr('id');
+
var showPlan = $('#showPlanCheck').is(':checked');
var suspendPlan = $('#suspendJobDuringPlanCheck').is(':checked');
var args = $('#commandLineArgsField').attr('value'); //TODO? Replace with .val() ?
- var url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+ var url;
+ if (assemblerClass == "<em>no entry class specified</em>") {
+ url = "runJob?" + $.param({ action: "submit", job: jobName, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+ } else {
+ url = "runJob?" + $.param({ action: "submit", job: jobName, assemblerClass: assemblerClass, arguments: args, show_plan: showPlan, suspend: suspendPlan});
+ }
window.location = url;
}