You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/10/16 12:43:11 UTC

[flink] branch master updated (46f8b59 -> 59dd855)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 46f8b59  [FLINK-14365][tests] Annotate MiniCluster tests in core modules with AlsoRunWithSchedulerNG
     new d632018  [FLINK-14290] Add Pipeline as a common base class of DataSet and DataStream plans
     new 8092daf  [FLINK-14290] Add Pipeline translation utils for getting a JobGraph from a FlinkPipeline
     new fa872eb  [FLINK-14290] Change DataSet PlanExecutors to use the new pipeline translation util
     new 55eb4b2  [FLINK-14290] Add support for StreamGraph to pipeline translation util
     new b47c13c  [FLINK-14290] Use LocalExecutor in LocalStreamEnvironment
     new 14b19dd  [FLINK-14290] Add SavepointRestoreSettings to StreamGraph and Generators
     new 85e5a77  [FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment
     new 3cdf06b  [FLINK-14290] Use PipelineTranslationUtil and client.submitJob() in StreamContextEnvironment
     new ee8f930  [hotfix] Fix formatting/checkstyle in PlanExecutor
     new eafdb64  [FLINK-14290] Move jars and classpath out of executors and add to execute() method
     new 98b54e0  [FLINK-14391] Factor out translator discovery in FlinkPipelineTranslationUtil
     new 70a8e0c  [FLINK-14391] Add JobID setter in JobGraph
     new 7084e07  [FLINK-14391] Add JSON execution graph generation to PipelineTranslationUtil
     new 835e776  [FLINK-14391] Remove JobID parameter from exception in RemoteStreamEnvironment
     new 59dd855  [FLINK-14391] Remove FlinkPlan as common base class of OptimizerPlan and StreamGraph

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/client/ClientUtils.java  |  16 +++
 .../flink/client/FlinkPipelineTranslationUtil.java |  98 ++++++++++++++++++
 .../flink/client/FlinkPipelineTranslator.java      |  31 +++---
 .../org/apache/flink/client/LocalExecutor.java     | 111 +++++++++-----------
 .../org/apache/flink/client/PlanTranslator.java    |  99 ++++++++++++++++++
 .../org/apache/flink/client/RemoteExecutor.java    |  68 ++++++------
 .../org/apache/flink/client/cli/CliFrontend.java   |  20 +---
 .../apache/flink/client/program/ClusterClient.java | 109 --------------------
 .../flink/client/program/ContextEnvironment.java   |  30 +++---
 .../client/program/OptimizerPlanEnvironment.java   |  28 ++---
 .../flink/client/program/PackagedProgramUtils.java |  60 ++++++-----
 .../RemoteExecutorHostnameResolutionTest.java      |   8 +-
 .../client/cli/CliFrontendPackageProgramTest.java  |   7 +-
 .../apache/flink/client/program/ClientTest.java    |  24 +++--
 .../client/program/ExecutionPlanCreationTest.java  |   4 +-
 .../java/org/apache/flink/api/common/Plan.java     |   3 +-
 .../org/apache/flink/api/common/PlanExecutor.java  | 114 +++++++++++----------
 .../{common/Archiveable.java => dag/Pipeline.java} |  13 +--
 .../apache/flink/api/java/LocalEnvironment.java    |   7 +-
 .../apache/flink/api/java/RemoteEnvironment.java   |   4 +-
 .../org/apache/flink/optimizer/plan/FlinkPlan.java |  28 -----
 .../apache/flink/optimizer/plan/OptimizedPlan.java |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java    |   9 +-
 .../api/java/ScalaShellRemoteEnvironment.java      |   4 +-
 .../org/apache/flink/api/java/FlinkILoopTest.java  |  13 +--
 .../api/environment/LocalStreamEnvironment.java    |  50 ++-------
 .../api/environment/RemoteStreamEnvironment.java   |  24 ++---
 .../api/environment/StreamContextEnvironment.java  |  20 +++-
 .../api/environment/StreamPlanEnvironment.java     |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  28 +++--
 .../streaming/api/graph/StreamGraphGenerator.java  |   9 +-
 .../streaming/api/graph/StreamGraphTranslator.java |  69 +++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java      |   2 +
 .../RemoteStreamExecutionEnvironmentTest.java      |  10 +-
 .../api/graph/StreamGraphGeneratorTest.java        |  18 ++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  20 +++-
 .../client/gateway/local/ExecutionContext.java     |  40 +++++---
 .../test/example/client/LocalExecutorITCase.java   |   3 +-
 38 files changed, 695 insertions(+), 510 deletions(-)
 create mode 100644 flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
 rename flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java => flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java (53%)
 create mode 100644 flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
 copy flink-core/src/main/java/org/apache/flink/api/{common/Archiveable.java => dag/Pipeline.java} (86%)
 delete mode 100644 flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java


[flink] 09/15: [hotfix] Fix formatting/checkstyle in PlanExecutor

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee8f9309d64e1438aef74e3a4376a0576f104412
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Tue Sep 24 10:23:22 2019 +0200

    [hotfix] Fix formatting/checkstyle in PlanExecutor
---
 .../org/apache/flink/api/common/PlanExecutor.java  | 117 +++++++++++----------
 1 file changed, 63 insertions(+), 54 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 83a8e17..7c90383 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -29,15 +29,15 @@ import java.util.List;
 /**
  * A PlanExecutor executes a Flink program's dataflow plan. All Flink programs are translated to
  * dataflow plans prior to execution.
- * 
+ *
  * <p>The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
- * The concrete implementations of the executors are loaded dynamically, because they depend on
- * the full set of all runtime classes.</p>
- * 
+ * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow. The
+ * concrete implementations of the executors are loaded dynamically, because they depend on the full
+ * set of all runtime classes.</p>
+ *
  * <p>PlanExecutors can be started explicitly, in which case they keep running until stopped. If
- * a program is submitted to a plan executor that is not running, it will start up for that
- * program, and shut down afterwards.</p>
+ * a program is submitted to a plan executor that is not running, it will start up for that program,
+ * and shut down afterwards.</p>
  */
 @Internal
 public abstract class PlanExecutor {
@@ -48,19 +48,19 @@ public abstract class PlanExecutor {
 	// ------------------------------------------------------------------------
 	//  Program Execution
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Execute the given program.
-	 * 
+	 *
 	 * <p>If the executor has not been started before, then this method will start the
-	 * executor and stop it after the execution has completed. This implies that one needs
-	 * to explicitly start the executor for all programs where multiple dataflow parts
-	 * depend on each other. Otherwise, the previous parts will no longer
-	 * be available, because the executor immediately shut down after the execution.</p>
-	 * 
+	 * executor and stop it after the execution has completed. This implies that one needs to
+	 * explicitly start the executor for all programs where multiple dataflow parts depend on each
+	 * other. Otherwise, the previous parts will no longer be available, because the executor
+	 * immediately shut down after the execution.</p>
+	 *
 	 * @param plan The plan of the program to execute.
-	 * @return The execution result, containing for example the net runtime of the program, and the accumulators.
-	 * 
+	 * @return The execution result, containing for example the net runtime of the program, and the
+	 * 		accumulators.
 	 * @throws Exception Thrown, if job submission caused an exception.
 	 */
 	public abstract JobExecutionResult executePlan(Pipeline plan) throws Exception;
@@ -68,77 +68,86 @@ public abstract class PlanExecutor {
 	// ------------------------------------------------------------------------
 	//  Executor Factories
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates an executor that runs the plan locally in a multi-threaded environment.
-	 * 
+	 *
 	 * @return A local executor.
 	 */
 	public static PlanExecutor createLocalExecutor(Configuration configuration) {
 		Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
-		
+
 		try {
 			return leClass.getConstructor(Configuration.class).newInstance(configuration);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("An error occurred while loading the local executor ("
-					+ LOCAL_EXECUTOR_CLASS + ").", t);
+		} catch (Throwable t) {
+			throw new RuntimeException(
+					"An error occurred while loading the local executor (" + LOCAL_EXECUTOR_CLASS + ").",
+					t);
 		}
 	}
 
 	/**
-	 * Creates an executor that runs the plan on a remote environment. The remote executor is typically used
-	 * to send the program to a cluster for execution.
+	 * Creates an executor that runs the plan on a remote environment. The remote executor is
+	 * typically used to send the program to a cluster for execution.
 	 *
 	 * @param hostname The address of the JobManager to send the program to.
 	 * @param port The port of the JobManager to send the program to.
 	 * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
-	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
-	 *                 from within the UDFs.
-	 * @param globalClasspaths A list of URLs that are added to the classpath of each user code classloader of the
-	 *                 program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
+	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes
+	 * 		and all classes used from within the UDFs.
+	 * @param globalClasspaths A list of URLs that are added to the classpath of each user code
+	 * 		classloader of the program. Paths must specify a protocol (e.g. file://) and be
+	 * 		accessible
+	 * 		on all nodes.
 	 * @return A remote executor.
 	 */
-	public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
-			List<URL> jarFiles, List<URL> globalClasspaths) {
+	public static PlanExecutor createRemoteExecutor(
+			String hostname,
+			int port,
+			Configuration clientConfiguration,
+			List<URL> jarFiles,
+			List<URL> globalClasspaths) {
 		if (hostname == null) {
 			throw new IllegalArgumentException("The hostname must not be null.");
 		}
 		if (port <= 0 || port > 0xffff) {
 			throw new IllegalArgumentException("The port value is out of range.");
 		}
-		
+
 		Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
-		
-		List<URL> files = (jarFiles == null) ?
-				Collections.<URL>emptyList() : jarFiles;
-		List<URL> paths = (globalClasspaths == null) ?
-				Collections.<URL>emptyList() : globalClasspaths;
+
+		List<URL> files = (jarFiles == null) ? Collections.<URL>emptyList() : jarFiles;
+		List<URL> paths = (globalClasspaths == null) ? Collections.<URL>emptyList() :
+				globalClasspaths;
 
 		try {
-			return (clientConfiguration == null) ?
-					reClass.getConstructor(String.class, int.class, List.class)
-						.newInstance(hostname, port, files) :
-					reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
-						.newInstance(hostname, port, clientConfiguration, files, paths);
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("An error occurred while loading the remote executor ("
-					+ REMOTE_EXECUTOR_CLASS + ").", t);
+			return (clientConfiguration == null) ? reClass
+					.getConstructor(String.class, int.class, List.class)
+					.newInstance(hostname, port, files) : reClass
+					.getConstructor(String.class,
+							int.class,
+							Configuration.class,
+							List.class,
+							List.class)
+					.newInstance(hostname, port, clientConfiguration, files, paths);
+		} catch (Throwable t) {
+			throw new RuntimeException(
+					"An error occurred while loading the remote executor (" + REMOTE_EXECUTOR_CLASS + ").",
+					t);
 		}
 	}
-	
+
 	private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
 		try {
 			Class<?> leClass = Class.forName(className);
 			return leClass.asSubclass(PlanExecutor.class);
-		}
-		catch (ClassNotFoundException cnfe) {
-			throw new RuntimeException("Could not load the executor class (" + className
-					+ "). Do you have the 'flink-clients' project in your dependencies?");
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
+		} catch (ClassNotFoundException cnfe) {
+			throw new RuntimeException("Could not load the executor class (" + className + "). Do " +
+					"you have the 'flink-clients' project in your dependencies?");
+		} catch (Throwable t) {
+			throw new RuntimeException(
+					"An error occurred while loading the executor (" + className + ").",
+					t);
 		}
 	}
 }


[flink] 15/15: [FLINK-14391] Remove FlinkPlan as common base class of OptimizerPlan and StreamGraph

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59dd855628052c369b64c71edc1018ed378e8eec
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri Oct 11 14:20:39 2019 +0200

    [FLINK-14391] Remove FlinkPlan as common base class of OptimizerPlan and StreamGraph
    
    We also need to change/simplify some translation logic because of this.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  20 +---
 .../apache/flink/client/program/ClusterClient.java | 109 ---------------------
 .../client/program/OptimizerPlanEnvironment.java   |  28 ++----
 .../flink/client/program/PackagedProgramUtils.java |  60 ++++++------
 .../client/cli/CliFrontendPackageProgramTest.java  |   7 +-
 .../apache/flink/client/program/ClientTest.java    |  24 +++--
 .../client/program/ExecutionPlanCreationTest.java  |   4 +-
 .../org/apache/flink/optimizer/plan/FlinkPlan.java |  28 ------
 .../apache/flink/optimizer/plan/OptimizedPlan.java |   2 +-
 .../apache/flink/optimizer/plan/StreamingPlan.java |  45 ---------
 .../api/environment/StreamPlanEnvironment.java     |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  14 ++-
 .../client/gateway/local/ExecutionContext.java     |  40 +++++---
 13 files changed, 102 insertions(+), 281 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c4d5fa9..552eccc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
@@ -40,13 +42,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -335,15 +330,8 @@ public class CliFrontend {
 
 			LOG.info("Creating program plan dump");
 
-			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
-			FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
-
-			String jsonPlan = null;
-			if (flinkPlan instanceof OptimizedPlan) {
-				jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
-			} else if (flinkPlan instanceof StreamingPlan) {
-				jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON();
-			}
+			Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism);
+			String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
 
 			if (jsonPlan != null) {
 				System.out.println("----------------------- Execution Plan -----------------------");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8179403..c0ea516 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -21,22 +21,10 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
@@ -49,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
@@ -65,9 +52,6 @@ public abstract class ClusterClient<T> implements AutoCloseable {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
-	/** The optimizer used in the optimization of batch programs. */
-	final Optimizer compiler;
-
 	/** Configuration of the client. */
 	private final Configuration flinkConfig;
 
@@ -94,7 +78,6 @@ public abstract class ClusterClient<T> implements AutoCloseable {
 	 */
 	public ClusterClient(Configuration flinkConfig) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
 	}
 
 	/**
@@ -111,45 +94,6 @@ public abstract class ClusterClient<T> implements AutoCloseable {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Access to the Program's Plan
-	// ------------------------------------------------------------------------
-
-	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException {
-		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
-	}
-
-	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException {
-		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-		try {
-			Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-
-			// temporary hack to support the optimizer plan preview
-			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
-			if (parallelism > 0) {
-				env.setParallelism(parallelism);
-			}
-			return env.getOptimizedPlan(prog);
-		} finally {
-			Thread.currentThread().setContextClassLoader(contextClassLoader);
-		}
-	}
-
-	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
-		Logger log = LoggerFactory.getLogger(ClusterClient.class);
-
-		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
-			p.setDefaultParallelism(parallelism);
-		}
-		log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
-
-		return compiler.compile(p);
-	}
-
-	// ------------------------------------------------------------------------
 	//  Program submission / execution
 	// ------------------------------------------------------------------------
 
@@ -193,28 +137,6 @@ public abstract class ClusterClient<T> implements AutoCloseable {
 		}
 	}
 
-	public JobSubmissionResult run(
-		Plan plan,
-		List<URL> libraries,
-		List<URL> classpaths,
-		ClassLoader classLoader,
-		int parallelism,
-		SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException {
-
-		OptimizedPlan optPlan = getOptimizedPlan(compiler, plan, parallelism);
-		return run(optPlan, libraries, classpaths, classLoader, savepointSettings);
-	}
-
-	public JobSubmissionResult run(
-		FlinkPlan compiledPlan,
-		List<URL> libraries,
-		List<URL> classpaths,
-		ClassLoader classLoader,
-		SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
-		JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
-		return submitJob(job, classLoader);
-	}
-
 	/**
 	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
 	 */
@@ -294,37 +216,6 @@ public abstract class ClusterClient<T> implements AutoCloseable {
 	public abstract Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception;
 
 	// ------------------------------------------------------------------------
-	//  Internal translation methods
-	// ------------------------------------------------------------------------
-
-	public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
-		return getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
-	}
-
-	public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
-		JobGraph job;
-		if (optPlan instanceof StreamingPlan) {
-			job = ((StreamingPlan) optPlan).getJobGraph();
-			job.setSavepointRestoreSettings(savepointSettings);
-		} else {
-			JobGraphGenerator gen = new JobGraphGenerator(flinkConfig);
-			job = gen.compileJobGraph((OptimizedPlan) optPlan);
-		}
-
-		for (URL jar : jarFiles) {
-			try {
-				job.addJar(new Path(jar.toURI()));
-			} catch (URISyntaxException e) {
-				throw new RuntimeException("URL is invalid. This should not happen.", e);
-			}
-		}
-
-		job.setClasspaths(classpaths);
-
-		return job;
-	}
-
-	// ------------------------------------------------------------------------
 	//  Abstract methods to be implemented by the cluster specific Client
 	// ------------------------------------------------------------------------
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index d2801ac..edf7d36 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -19,27 +19,20 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
 /**
- * An {@link ExecutionEnvironment} that never executes a job but only creates the optimized plan.
+ * An {@link ExecutionEnvironment} that never executes a job but only extracts the {@link
+ * org.apache.flink.api.dag.Pipeline}.
  */
 public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 
-	private final Optimizer compiler;
-
-	private FlinkPlan optimizerPlan;
-
-	public OptimizerPlanEnvironment(Optimizer compiler) {
-		this.compiler = compiler;
-	}
+	private Pipeline pipeline;
 
 	// ------------------------------------------------------------------------
 	//  Execution Environment methods
@@ -47,14 +40,13 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		Plan plan = createProgramPlan(jobName);
-		this.optimizerPlan = compiler.compile(plan);
+		this.pipeline = createProgramPlan();
 
 		// do not go on with anything now!
 		throw new ProgramAbortException();
 	}
 
-	public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
+	public Pipeline getPipeline(PackagedProgram prog) throws ProgramInvocationException {
 
 		// temporarily write syserr and sysout to a byte array.
 		PrintStream originalOut = System.out;
@@ -73,8 +65,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		}
 		catch (Throwable t) {
 			// the invocation gets aborted with the preview plan
-			if (optimizerPlan != null) {
-				return optimizerPlan;
+			if (pipeline != null) {
+				return pipeline;
 			} else {
 				throw new ProgramInvocationException("The program caused an error: ", t);
 			}
@@ -112,8 +104,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 
 	// ------------------------------------------------------------------------
 
-	public void setPlan(FlinkPlan plan){
-		this.optimizerPlan = plan;
+	public void setPipeline(Pipeline pipeline){
+		this.pipeline = pipeline;
 	}
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index d9a10e6..fa9f8b0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -19,22 +19,15 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
-import java.net.URISyntaxException;
-import java.net.URL;
-
 /**
  * Utility class for {@link PackagedProgram} related operations.
  */
@@ -56,33 +49,21 @@ public class PackagedProgramUtils {
 			Configuration configuration,
 			int defaultParallelism,
 			@Nullable JobID jobID) throws ProgramInvocationException {
+
 		Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
-		final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
 
-		final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
+		final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment();
 		optimizerPlanEnvironment.setParallelism(defaultParallelism);
+		final Pipeline pipeline = optimizerPlanEnvironment.getPipeline(packagedProgram);
 
-		final FlinkPlan flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
-
-		final JobGraph jobGraph;
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism);
 
-		if (flinkPlan instanceof StreamingPlan) {
-			jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID);
-			jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
-		} else {
-			final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
-			jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
-		}
-
-		for (URL url : packagedProgram.getAllLibraries()) {
-			try {
-				jobGraph.addJar(new Path(url.toURI()));
-			} catch (URISyntaxException e) {
-				throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e);
-			}
+		if (jobID != null) {
+			jobGraph.setJobID(jobID);
 		}
-
+		ClientUtils.addJarFiles(jobGraph, packagedProgram.getAllLibraries());
 		jobGraph.setClasspaths(packagedProgram.getClasspaths());
+		jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 
 		return jobGraph;
 	}
@@ -104,5 +85,22 @@ public class PackagedProgramUtils {
 		return createJobGraph(packagedProgram, configuration, defaultParallelism, null);
 	}
 
+	public static Pipeline getPipelineFromProgram(PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException {
+		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+		try {
+			Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+
+			// temporary hack to support the optimizer plan preview
+			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment();
+			if (parallelism > 0) {
+				env.setParallelism(parallelism);
+			}
+			return env.getPipeline(prog);
+		} finally {
+			Thread.currentThread().setContextClassLoader(contextClassLoader);
+		}
+	}
+
 	private PackagedProgramUtils() {}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
index 48c8891..873ba00 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
@@ -284,7 +286,8 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);
 
 			// we expect this to fail with a "ClassNotFoundException"
-			ClusterClient.getOptimizedPlanAsJson(compiler, prog, 666);
+			Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666);
+			FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
 			fail("Should have failed with a ClassNotFoundException");
 		}
 		catch (ProgramInvocationException e) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index b31f95c..634ebf0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -36,7 +38,7 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
@@ -156,13 +158,16 @@ public class ClientTest extends TestLogger {
 	public void shouldSubmitToJobClient() throws Exception {
 		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
 		clusterClient.setDetached(true);
-		JobSubmissionResult result = clusterClient.run(
-			plan,
-			Collections.emptyList(),
-			Collections.emptyList(),
-			getClass().getClassLoader(),
-			1,
-			SavepointRestoreSettings.none());
+
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
+				plan,
+				new Configuration(),
+				1);
+
+		ClientUtils.addJarFiles(jobGraph, Collections.emptyList());
+		jobGraph.setClasspaths(Collections.emptyList());
+
+		JobSubmissionResult result = clusterClient.submitJob(jobGraph, getClass().getClassLoader());
 
 		assertNotNull(result);
 	}
@@ -198,7 +203,8 @@ public class ClientTest extends TestLogger {
 		PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
 
 		Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
-		OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1);
+		Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1);
+		OptimizedPlan op = optimizer.compile(plan);
 		assertNotNull(op);
 
 		PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index d26b0d0..1b52f37 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -59,7 +60,8 @@ public class ExecutionPlanCreationTest {
 			config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());
 
 			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
-			OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
+			Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1);
+			OptimizedPlan op = optimizer.compile(plan);
 			assertNotNull(op);
 
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
deleted file mode 100644
index d146c83..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-/**
- * A common interface for compiled Flink plans for both batch and streaming
- * processing programs.
- * 
- */
-public interface FlinkPlan {
-
-}
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
index 311c286..3e8f2f0 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.Visitor;
  * all operator strategies (sorting-merge join, hash join, sorted grouping, ...),
  * and the data exchange modes (batched, pipelined).</p>
  */
-public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
+public class OptimizedPlan implements Visitable<PlanNode>  {
 	
 	/** The data sources in the plan. */
 	private final Collection<SourcePlanNode> dataSources;
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
deleted file mode 100644
index f3fe632..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import javax.annotation.Nullable;
-
-/**
- * Abstract class representing Flink Streaming plans.
- */
-public abstract class StreamingPlan implements FlinkPlan {
-
-	/**
-	 * Gets the assembled {@link JobGraph} with a random {@link JobID}.
-	 */
-	@SuppressWarnings("deprecation")
-	public JobGraph getJobGraph() {
-		return getJobGraph(null);
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
-	 */
-	public abstract JobGraph getJobGraph(@Nullable JobID jobID);
-
-	public abstract String getStreamingPlanAsJSON();
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index bb1a4cf..54ef3e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -57,7 +57,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 		transformations.clear();
 
 		if (env instanceof OptimizerPlanEnvironment) {
-			((OptimizerPlanEnvironment) env).setPlan(streamGraph);
+			((OptimizerPlanEnvironment) env).setPipeline(streamGraph);
 		}
 
 		throw new OptimizerPlanEnvironment.ProgramAbortException();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ec6992f..a8b322f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
@@ -73,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  */
 @Internal
-public class StreamGraph extends StreamingPlan implements Pipeline {
+public class StreamGraph implements Pipeline {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
 
@@ -727,14 +726,19 @@ public class StreamGraph extends StreamingPlan implements Pipeline {
 	}
 
 	/**
-	 * Gets the assembled {@link JobGraph} with a given job id.
+	 * Gets the assembled {@link JobGraph} with a random {@link JobID}.
+	 */
+	public JobGraph getJobGraph() {
+		return getJobGraph(null);
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
 	 */
-	@Override
 	public JobGraph getJobGraph(@Nullable JobID jobID) {
 		return StreamingJobGraphGenerator.createJobGraph(this, jobID);
 	}
 
-	@Override
 	public String getStreamingPlanAsJSON() {
 		try {
 			return new JSONGenerator(this).getJSON();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 27c3ee2..73c9cce 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -21,19 +21,17 @@ package org.apache.flink.table.client.gateway.local;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -440,16 +438,29 @@ public class ExecutionContext<T> {
 		}
 
 		public JobGraph createJobGraph(String name) {
-			final FlinkPlan plan = createPlan(name, flinkConfig);
-			return ClusterClient.getJobGraph(
-				flinkConfig,
-				plan,
-				dependencies,
-				runOptions.getClasspaths(),
-				runOptions.getSavepointRestoreSettings());
+			final Pipeline pipeline = createPipeline(name, flinkConfig);
+
+			int parallelism;
+			if (execEnv != null) {
+				parallelism = execEnv.getParallelism();
+			} else if (streamExecEnv != null) {
+				parallelism = streamExecEnv.getParallelism();
+			} else {
+				throw new RuntimeException("No execution environment defined.");
+			}
+			JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
+					pipeline,
+					flinkConfig,
+					parallelism);
+
+			ClientUtils.addJarFiles(jobGraph, dependencies);
+			jobGraph.setClasspaths(runOptions.getClasspaths());
+			jobGraph.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings());
+
+			return jobGraph;
 		}
 
-		private FlinkPlan createPlan(String name, Configuration flinkConfig) {
+		private Pipeline createPipeline(String name, Configuration flinkConfig) {
 			if (streamExecEnv != null) {
 				// special case for Blink planner to apply batch optimizations
 				// note: it also modifies the ExecutionConfig!
@@ -461,8 +472,7 @@ public class ExecutionContext<T> {
 				final int parallelism = execEnv.getParallelism();
 				final Plan unoptimizedPlan = execEnv.createProgramPlan();
 				unoptimizedPlan.setJobName(name);
-				final Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
-				return ClusterClient.getOptimizedPlan(compiler, unoptimizedPlan, parallelism);
+				return unoptimizedPlan;
 			}
 		}
 


[flink] 14/15: [FLINK-14391] Remove JobID parameter from exception in RemoteStreamEnvironment

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 835e776a9038448333a05b6e08e7f036ae2c74be
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri Oct 11 14:25:58 2019 +0200

    [FLINK-14391] Remove JobID parameter from exception in RemoteStreamEnvironment
    
    Creating a JobGraph from a StreamGraph using this method creates a
    random JobID that doesn't give any information.
---
 .../flink/streaming/api/environment/RemoteStreamEnvironment.java    | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 1de834a..2574b8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -267,8 +267,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
 		}
 		catch (Exception e) {
-			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
-				streamGraph.getJobGraph().getJobID(), e);
+			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
 		}
 
 		if (savepointRestoreSettings != null) {
@@ -288,8 +287,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		}
 		catch (Exception e) {
 			String term = e.getMessage() == null ? "." : (": " + e.getMessage());
-			throw new ProgramInvocationException("The program execution failed" + term,
-				streamGraph.getJobGraph().getJobID(), e);
+			throw new ProgramInvocationException("The program execution failed" + term, e);
 		}
 		finally {
 			try {


[flink] 08/15: [FLINK-14290] Use PipelineTranslationUtil and client.submitJob() in StreamContextEnvironment

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3cdf06b25b7ea9794768bf718d253bf222aa15b0
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 19 16:42:43 2019 +0200

    [FLINK-14290] Use PipelineTranslationUtil and client.submitJob() in StreamContextEnvironment
---
 .../api/environment/StreamContextEnvironment.java    | 20 +++++++++++++++++---
 .../RemoteStreamExecutionEnvironmentTest.java        | 10 +++++-----
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 6ee4541..b2db19b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,7 +19,10 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 /**
@@ -43,9 +46,20 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 
-		// execute the programs
-		return ctx.getClient()
-				.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
+				streamGraph,
+				ctx.getClient().getFlinkConfiguration(),
+				getParallelism());
+
+		ClientUtils.addJarFiles(jobGraph, ctx.getJars());
+		jobGraph.setClasspaths(ctx.getClasspaths());
+
+		// running from the CLI will override the savepoint restore settings
+		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
+
+		return ctx
+				.getClient()
+				.submitJob(jobGraph, ctx.getUserCodeClassLoader())
 				.getJobExecutionResult();
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
index b30c338..acfa605 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.RemoteExecutor;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
@@ -39,7 +40,8 @@ import static org.mockito.Mockito.when;
  * Tests for the {@link RemoteStreamEnvironment}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({RemoteStreamEnvironment.class})
+// TODO: I don't like that I have to do this
+@PrepareForTest({RemoteStreamEnvironment.class, RemoteExecutor.class})
 public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
 
 	/**
@@ -53,8 +55,7 @@ public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
 		JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);
 
 		RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
-		when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
-			.thenReturn(expectedResult);
+		when(mockedClient.submitJob(Mockito.any(), Mockito.any())).thenReturn(expectedResult);
 
 		PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation) -> {
 				Object[] args = invocation.getArguments();
@@ -85,8 +86,7 @@ public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
 		JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);
 
 		PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenReturn(mockedClient);
-		when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(restoreSettings)))
-			.thenReturn(expectedResult);
+		when(mockedClient.submitJob(Mockito.any(), Mockito.any())).thenReturn(expectedResult);
 
 		JobExecutionResult actualResult = env.execute("fakeJobName");
 		Assert.assertEquals(expectedResult, actualResult);


[flink] 01/15: [FLINK-14290] Add Pipeline as a common base class of DataSet and DataStream plans

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d632018ed915a8ca7c5f7b180e023a87cfae9a21
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Wed Sep 11 17:36:23 2019 +0200

    [FLINK-14290] Add Pipeline as a common base class of DataSet and DataStream plans
    
    For now, only DataSet Plan implements this, as a follow-up, StreamGraph
    should also implement this.
---
 .../java/org/apache/flink/api/common/Plan.java     |  3 ++-
 .../java/org/apache/flink/api/dag/Pipeline.java    | 28 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 0937842..0e71bcd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
@@ -46,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * operators of the data flow can be reached via backwards traversal</p>.
  */
 @Internal
-public class Plan implements Visitable<Operator<?>> {
+public class Plan implements Visitable<Operator<?>>, Pipeline {
 
 	/**
 	 * A collection of all sinks in the plan. Since the plan is traversed from the sinks to the
diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Pipeline.java b/flink-core/src/main/java/org/apache/flink/api/dag/Pipeline.java
new file mode 100644
index 0000000..ab75e03
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Pipeline.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.dag;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Common interface for Flink pipelines.
+ */
+@Internal
+public interface Pipeline {}


[flink] 03/15: [FLINK-14290] Change DataSet PlanExecutors to use the new pipeline translation util

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa872ebfe47bc46c0afba19091caf84f2c29a7b9
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Wed Sep 11 17:38:31 2019 +0200

    [FLINK-14290] Change DataSet PlanExecutors to use the new pipeline translation util
    
    This decouples JobGraph generation from the executors and environments.
---
 .../java/org/apache/flink/client/ClientUtils.java  | 16 ++++
 .../org/apache/flink/client/LocalExecutor.java     | 96 ++++++++++------------
 .../org/apache/flink/client/RemoteExecutor.java    | 34 +++++---
 .../flink/client/program/ContextEnvironment.java   | 30 ++++---
 .../org/apache/flink/api/common/PlanExecutor.java  |  3 +-
 .../org/apache/flink/api/java/FlinkILoopTest.java  |  4 +-
 6 files changed, 106 insertions(+), 77 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index ee03705..9fb4ce5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,6 +35,20 @@ import java.util.jar.JarFile;
 public enum ClientUtils {
 	;
 
+	/**
+	 * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. This will
+	 * throw an exception if a jar URL is not valid.
+	 */
+	public static void addJarFiles(JobGraph jobGraph, List<URL> jarFilesToAttach) {
+		for (URL jar : jarFilesToAttach) {
+			try {
+				jobGraph.addJar(new Path(jar.toURI()));
+			} catch (URISyntaxException e) {
+				throw new RuntimeException("URL is invalid. This should not happen.", e);
+			}
+		}
+	}
+
 	public static void checkJarFile(URL jar) throws IOException {
 		File jarFile;
 		try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index d9444d5..290cff9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -21,26 +21,25 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
+import java.util.Collections;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
  *
- * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ * <p>By simply calling the {@link #executePlan(Pipeline)} method,
  * this executor still start up and shut down again immediately after the program finished.</p>
  *
  * <p>To use this executor to execute many dataflow programs that constitute one job together,
@@ -59,22 +58,29 @@ public class LocalExecutor extends PlanExecutor {
 		this.baseConfiguration = checkNotNull(conf);
 	}
 
-	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
+	private JobExecutorService createJobExecutorService(
+			JobGraph jobGraph, Configuration configuration) throws Exception {
 		if (!configuration.contains(RestOptions.BIND_PORT)) {
 			configuration.setString(RestOptions.BIND_PORT, "0");
 		}
 
-		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(configuration)
-			.setNumTaskManagers(
-				configuration.getInteger(
-					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-					ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-			.setRpcServiceSharing(RpcServiceSharing.SHARED)
-			.setNumSlotsPerTaskManager(
-				configuration.getInteger(
-					TaskManagerOptions.NUM_TASK_SLOTS, 1))
-			.build();
+		int numTaskManagers = configuration.getInteger(
+				ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+				ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+		// we have to use the maximum parallelism as a default here, otherwise streaming
+		// pipelines would not run
+		int numSlotsPerTaskManager = configuration.getInteger(
+				TaskManagerOptions.NUM_TASK_SLOTS,
+				jobGraph.getMaximumParallelism());
+
+		final MiniClusterConfiguration miniClusterConfiguration =
+				new MiniClusterConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumTaskManagers(numTaskManagers)
+				.setRpcServiceSharing(RpcServiceSharing.SHARED)
+				.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build();
 
 		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
 		miniCluster.start();
@@ -91,50 +97,38 @@ public class LocalExecutor extends PlanExecutor {
 	 * after the job finished. If the job runs in session mode, the executor is kept alive until
 	 * no more references to the executor exist.</p>
 	 *
-	 * @param plan The plan of the program to execute.
+	 * @param pipeline The pipeline of the program to execute.
 	 * @return The net runtime of the program, in milliseconds.
 	 *
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
 	@Override
-	public JobExecutionResult executePlan(Plan plan) throws Exception {
-		checkNotNull(plan);
-
-		final Configuration jobExecutorServiceConfiguration = configureExecution(plan);
-
-		try (final JobExecutorService executorService = createJobExecutorService(jobExecutorServiceConfiguration)) {
+	public JobExecutionResult executePlan(Pipeline pipeline) throws Exception {
+		checkNotNull(pipeline);
+
+		// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
+		// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
+		// for now.
+		if (pipeline instanceof Plan) {
+			Plan plan = (Plan) pipeline;
+			final int slotsPerTaskManager = baseConfiguration.getInteger(
+					TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
+			final int numTaskManagers = baseConfiguration.getInteger(
+					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+			plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
+		}
 
-			Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
-			OptimizedPlan op = pc.compile(plan);
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline,
+				baseConfiguration,
+				1);
 
-			JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
-			JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+		jobGraph.setAllowQueuedScheduling(true);
 
+		try (final JobExecutorService executorService = createJobExecutorService(jobGraph,
+				baseConfiguration)) {
 			return executorService.executeJobBlocking(jobGraph);
 		}
 	}
-
-	private Configuration configureExecution(final Plan plan) {
-		final Configuration executorConfiguration = createExecutorServiceConfig(plan);
-		setPlanParallelism(plan, executorConfiguration);
-		return executorConfiguration;
-	}
-
-	private Configuration createExecutorServiceConfig(final Plan plan) {
-		final Configuration newConfiguration = new Configuration();
-		newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
-		newConfiguration.addAll(baseConfiguration);
-		return newConfiguration;
-	}
-
-	private void setPlanParallelism(final Plan plan, final Configuration executorServiceConfig) {
-		// TODO: Set job's default parallelism to max number of slots
-		final int slotsPerTaskManager = executorServiceConfig.getInteger(
-				TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
-		final int numTaskManagers = executorServiceConfig.getInteger(
-				ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-		plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
-	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 4d0dfd9..d78a04f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -19,14 +19,14 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
@@ -114,19 +114,31 @@ public class RemoteExecutor extends PlanExecutor {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public JobExecutionResult executePlan(Plan plan) throws Exception {
+	public JobExecutionResult executePlan(Pipeline plan) throws Exception {
 		checkNotNull(plan);
 
-		try (ClusterClient<?> client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) {
-			ClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());
-
-			return client.run(
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
 				plan,
+				clientConfiguration,
+				getDefaultParallelism());
+
+		ClientUtils.addJarFiles(jobGraph, jarFiles);
+		jobGraph.setClasspaths(globalClasspaths);
+
+		ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
 				jarFiles,
-				globalClasspaths,
-				classLoader,
-				defaultParallelism,
-				SavepointRestoreSettings.none()).getJobExecutionResult();
+				this.globalClasspaths,
+				getClass().getClassLoader());
+
+		return executePlanWithJars(jobGraph, userCodeClassLoader);
+	}
+
+	private JobExecutionResult executePlanWithJars(JobGraph jobGraph, ClassLoader classLoader) throws Exception {
+		checkNotNull(jobGraph);
+		checkNotNull(classLoader);
+
+		try (ClusterClient<?>  client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) {
+			return client.submitJob(jobGraph, classLoader).getJobExecutionResult();
 		}
 	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 4d46595..0ceb850 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,9 +21,11 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.net.URL;
@@ -64,17 +66,21 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
 
-		final Plan plan = createProgramPlan(jobName);
-		final JobSubmissionResult jobSubmissionResult = client.run(
-			plan,
-			jarFilesToAttach,
-			classpathsToAttach,
-			userCodeClassLoader,
-			getParallelism(),
-			savepointSettings);
-
-		lastJobExecutionResult = jobSubmissionResult.getJobExecutionResult();
-		return lastJobExecutionResult;
+		Plan plan = createProgramPlan(jobName);
+
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
+				plan,
+				client.getFlinkConfiguration(),
+				getParallelism());
+
+		ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach);
+		jobGraph.setClasspaths(this.classpathsToAttach);
+
+		this.lastJobExecutionResult = client
+				.submitJob(jobGraph, this.userCodeClassLoader)
+				.getJobExecutionResult();
+
+		return this.lastJobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 2095c63..83a8e17 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.URL;
@@ -62,7 +63,7 @@ public abstract class PlanExecutor {
 	 * 
 	 * @throws Exception Thrown, if job submission caused an exception.
 	 */
-	public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
+	public abstract JobExecutionResult executePlan(Pipeline plan) throws Exception;
 
 	// ------------------------------------------------------------------------
 	//  Executor Factories
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index eddf66c..5b46c54 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.java;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
@@ -128,7 +128,7 @@ public class FlinkILoopTest extends TestLogger {
 		private List<String> globalClasspaths;
 
 		@Override
-		public JobExecutionResult executePlan(Plan plan) throws Exception {
+		public JobExecutionResult executePlan(Pipeline plan) throws Exception {
 			return null;
 		}
 


[flink] 04/15: [FLINK-14290] Add support for StreamGraph to pipeline translation util

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 55eb4b254eb531287ec84c63874b479d42f8e315
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 12 13:37:18 2019 +0200

    [FLINK-14290] Add support for StreamGraph to pipeline translation util
---
 .../flink/client/FlinkPipelineTranslationUtil.java | 38 ++++++++++++++++++-
 .../flink/streaming/api/graph/StreamGraph.java     |  3 +-
 .../streaming/api/graph/StreamGraphTranslator.java | 43 +++++++++++++---------
 3 files changed, 64 insertions(+), 20 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
index 61bda14..33c8027 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -47,6 +47,42 @@ public final class FlinkPipelineTranslationUtil {
 					defaultParallelism);
 		}
 
-		throw new RuntimeException("Cannot find transmogrifier for given pipeline: " + pipeline);
+		FlinkPipelineTranslator streamGraphTranslator = reflectStreamGraphTranslator();
+
+		if (!streamGraphTranslator.canTranslate(pipeline)) {
+			throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
+					+ "the given pipeline " + pipeline + ".");
+		}
+
+		return streamGraphTranslator.translate(pipeline,
+				optimizerConfiguration,
+				defaultParallelism);
+	}
+
+	private static FlinkPipelineTranslator reflectStreamGraphTranslator() {
+		// Try our luck with StreamGraph translation. We have to load a StreamGraphTranslator
+		// via reflection because the dependencies of flink-streaming-java are inverted compared
+		// to flink-java. For flink-java does not depend on runtime, clients or optimizer and
+		// we have the translation code in clients/optimizer. On the other hand,
+		// flink-streaming-java depends on runtime and clients.
+
+		Class<?> streamGraphTranslatorClass;
+		try {
+			streamGraphTranslatorClass = Class.forName(
+					"org.apache.flink.streaming.api.graph.StreamGraphTranslator",
+					true,
+					FlinkPipelineTranslationUtil.class.getClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not load StreamGraphTranslator.", e);
+		}
+
+		FlinkPipelineTranslator streamGraphTranslator;
+		try {
+			streamGraphTranslator =
+					(FlinkPipelineTranslator) streamGraphTranslatorClass.newInstance();
+		} catch (InstantiationException | IllegalAccessException e) {
+			throw new RuntimeException("Could not instantiate StreamGraphTranslator.", e);
+		}
+		return streamGraphTranslator;
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index f4741fb..522c6fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -71,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  */
 @Internal
-public class StreamGraph extends StreamingPlan {
+public class StreamGraph extends StreamingPlan implements Pipeline {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
similarity index 52%
copy from flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
index 61bda14..ebd554f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
@@ -17,36 +17,43 @@
  *
  */
 
-package org.apache.flink.client;
+package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * Utility for transforming {@link Pipeline FlinkPipelines} into a {@link JobGraph}. This uses
- * reflection or service discovery to find the right {@link FlinkPipelineTranslator} for a given
- * subclass of {@link Pipeline}.
+ * {@link FlinkPipelineTranslator} for DataStream API {@link StreamGraph StreamGraphs}.
+ *
+ * <p>Note: this is used through reflection in
+ * {@link org.apache.flink.client.FlinkPipelineTranslationUtil}.
  */
-public final class FlinkPipelineTranslationUtil {
+@SuppressWarnings("unused")
+public class StreamGraphTranslator implements FlinkPipelineTranslator {
 
-	/**
-	 * Transmogrifies the given {@link Pipeline} to a {@link JobGraph}.
-	 */
-	public static JobGraph getJobGraph(
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphTranslator.class);
+
+	@Override
+	public JobGraph translate(
 			Pipeline pipeline,
 			Configuration optimizerConfiguration,
 			int defaultParallelism) {
+		checkArgument(pipeline instanceof StreamGraph,
+				"Given pipeline is not a DataStream StreamGraph.");
 
-		PlanTranslator planToJobGraphTransmogrifier = new PlanTranslator();
-
-		if (planToJobGraphTransmogrifier.canTranslate(pipeline)) {
-			// we have a DataSet program
-			return planToJobGraphTransmogrifier.translate(pipeline,
-					optimizerConfiguration,
-					defaultParallelism);
-		}
+		StreamGraph streamGraph = (StreamGraph) pipeline;
+		return streamGraph.getJobGraph();
+	}
 
-		throw new RuntimeException("Cannot find transmogrifier for given pipeline: " + pipeline);
+	@Override
+	public boolean canTranslate(Pipeline pipeline) {
+		return pipeline instanceof StreamGraph;
 	}
 }


[flink] 13/15: [FLINK-14391] Add JSON execution graph generation to PipelineTranslationUtil

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7084e07eaba0d967cb2474bb4ea9e80da5cca46e
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri Oct 11 14:17:43 2019 +0200

    [FLINK-14391] Add JSON execution graph generation to PipelineTranslationUtil
---
 .../flink/client/FlinkPipelineTranslationUtil.java    | 10 +++++++++-
 .../apache/flink/client/FlinkPipelineTranslator.java  |  8 +++++++-
 .../java/org/apache/flink/client/PlanTranslator.java  | 19 ++++++++++++++++++-
 .../streaming/api/graph/StreamGraphTranslator.java    | 14 ++++++++++++--
 4 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
index e54a428..f88c69d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -40,11 +40,19 @@ public final class FlinkPipelineTranslationUtil {
 
 		FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
 
-		return pipelineTranslator.translate(pipeline,
+		return pipelineTranslator.translateToJobGraph(pipeline,
 				optimizerConfiguration,
 				defaultParallelism);
 	}
 
+	/**
+	 * Extracts the execution plan (as JSON) from the given {@link Pipeline}.
+	 */
+	public static String translateToJSONExecutionPlan(Pipeline pipeline) {
+		FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+		return pipelineTranslator.translateToJSONExecutionPlan(pipeline);
+	}
+
 	private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
 		PlanTranslator planToJobGraphTransmogrifier = new PlanTranslator();
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java
index c224422..fa3bc39 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java
@@ -33,10 +33,16 @@ public interface FlinkPipelineTranslator {
 	 * Creates a {@link JobGraph} from the given {@link Pipeline} and attaches the given jar
 	 * files and classpaths to the {@link JobGraph}.
 	 */
-	JobGraph translate(
+	JobGraph translateToJobGraph(
 			Pipeline pipeline,
 			Configuration optimizerConfiguration,
 			int defaultParallelism);
 
+
+	/**
+	 * Extracts the execution plan (as JSON) from the given {@link Pipeline}.
+	 */
+	String translateToJSONExecutionPlan(Pipeline pipeline);
+
 	boolean canTranslate(Pipeline pipeline);
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
index ae18b6b..43c6bb7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
@@ -41,7 +43,7 @@ public class PlanTranslator implements FlinkPipelineTranslator {
 	private static final Logger LOG = LoggerFactory.getLogger(PlanTranslator.class);
 
 	@Override
-	public JobGraph translate(
+	public JobGraph translateToJobGraph(
 			Pipeline pipeline,
 			Configuration optimizerConfiguration,
 			int defaultParallelism) {
@@ -67,6 +69,21 @@ public class PlanTranslator implements FlinkPipelineTranslator {
 				plan.getDefaultParallelism());
 	}
 
+	@Override
+	public String translateToJSONExecutionPlan(Pipeline pipeline) {
+		checkArgument(pipeline instanceof Plan, "Given pipeline is not a DataSet Plan.");
+
+		Plan plan = (Plan) pipeline;
+
+		Optimizer opt = new Optimizer(
+				new DataStatistics(),
+				new DefaultCostEstimator(),
+				new Configuration());
+		OptimizedPlan optPlan = opt.compile(plan);
+
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
+	}
+
 	private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
 		Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
 		OptimizedPlan optimizedPlan = optimizer.compile(plan);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
index ebd554f..4a43adb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
@@ -41,7 +41,7 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphTranslator.class);
 
 	@Override
-	public JobGraph translate(
+	public JobGraph translateToJobGraph(
 			Pipeline pipeline,
 			Configuration optimizerConfiguration,
 			int defaultParallelism) {
@@ -49,7 +49,17 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
 				"Given pipeline is not a DataStream StreamGraph.");
 
 		StreamGraph streamGraph = (StreamGraph) pipeline;
-		return streamGraph.getJobGraph();
+		return streamGraph.getJobGraph(null);
+	}
+
+	@Override
+	public String translateToJSONExecutionPlan(Pipeline pipeline) {
+		checkArgument(pipeline instanceof StreamGraph,
+				"Given pipeline is not a DataStream StreamGraph.");
+
+		StreamGraph streamGraph = (StreamGraph) pipeline;
+
+		return streamGraph.getStreamingPlanAsJSON();
 	}
 
 	@Override


[flink] 05/15: [FLINK-14290] Use LocalExecutor in LocalStreamEnvironment

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b47c13c00be47d3a9132761134261d80295920b4
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 12 13:37:39 2019 +0200

    [FLINK-14290] Use LocalExecutor in LocalStreamEnvironment
---
 .../api/environment/LocalStreamEnvironment.java    | 48 ++--------------------
 1 file changed, 4 insertions(+), 44 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f655af6..54f4354 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,18 +20,11 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nonnull;
 
 /**
@@ -45,8 +38,6 @@ import javax.annotation.Nonnull;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
-
 	private final Configuration configuration;
 
 	/**
@@ -83,42 +74,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
-		JobGraph jobGraph = streamGraph.getJobGraph();
-		jobGraph.setAllowQueuedScheduling(true);
-
-		Configuration configuration = new Configuration();
-		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "0");
-
-		// add (and override) the settings with what the user defined
-		configuration.addAll(this.configuration);
-
-		if (!configuration.contains(RestOptions.BIND_PORT)) {
-			configuration.setString(RestOptions.BIND_PORT, "0");
-		}
-
-		int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
-
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setConfiguration(configuration)
-			.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
-			.build();
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running job on local embedded Flink mini cluster");
-		}
-
-		MiniCluster miniCluster = new MiniCluster(cfg);
-
 		try {
-			miniCluster.start();
-			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
-
-			return miniCluster.executeJobBlocking(jobGraph);
-		}
-		finally {
+			final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
+			return executor.executePlan(streamGraph);
+		} finally {
 			transformations.clear();
-			miniCluster.close();
 		}
 	}
 }


[flink] 10/15: [FLINK-14290] Move jars and classpath out of executors and add to execute() method

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eafdb64e40b30ca120e318a62be21107657b5573
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Tue Sep 24 13:34:38 2019 +0200

    [FLINK-14290] Move jars and classpath out of executors and add to execute() method
    
    Before, the executors were potentially storing the jar and classpath
    internally while the jars and classpaths are really a property of the
    pipeline that should be executed. This change reflects that.
---
 .../org/apache/flink/client/LocalExecutor.java     | 23 ++++-------
 .../org/apache/flink/client/RemoteExecutor.java    | 46 ++++++++--------------
 .../RemoteExecutorHostnameResolutionTest.java      |  8 ++--
 .../org/apache/flink/api/common/PlanExecutor.java  | 44 +++++++++------------
 .../apache/flink/api/java/LocalEnvironment.java    |  7 +++-
 .../apache/flink/api/java/RemoteEnvironment.java   |  4 +-
 .../api/java/ScalaShellRemoteEnvironment.java      |  4 +-
 .../org/apache/flink/api/java/FlinkILoopTest.java  | 11 ++----
 .../api/environment/LocalStreamEnvironment.java    |  4 +-
 .../api/environment/RemoteStreamEnvironment.java   |  7 ++--
 .../test/example/client/LocalExecutorITCase.java   |  3 +-
 11 files changed, 67 insertions(+), 94 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 290cff9..d866a4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -32,14 +32,15 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
-import java.util.Collections;
+import java.net.URL;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
  *
- * <p>By simply calling the {@link #executePlan(Pipeline)} method,
+ * <p>By simply calling the {@link #executePlan(Pipeline, List, List)} method,
  * this executor still start up and shut down again immediately after the program finished.</p>
  *
  * <p>To use this executor to execute many dataflow programs that constitute one job together,
@@ -90,21 +91,11 @@ public class LocalExecutor extends PlanExecutor {
 		return miniCluster;
 	}
 
-	/**
-	 * Executes the given program on a local runtime and waits for the job to finish.
-	 *
-	 * <p>If the executor has not been started before, this starts the executor and shuts it down
-	 * after the job finished. If the job runs in session mode, the executor is kept alive until
-	 * no more references to the executor exist.</p>
-	 *
-	 * @param pipeline The pipeline of the program to execute.
-	 * @return The net runtime of the program, in milliseconds.
-	 *
-	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
-	 *                   caused an exception.
-	 */
 	@Override
-	public JobExecutionResult executePlan(Pipeline pipeline) throws Exception {
+	public JobExecutionResult executePlan(
+			Pipeline pipeline,
+			List<URL> jarFiles,
+			List<URL> globalClasspaths) throws Exception {
 		checkNotNull(pipeline);
 
 		// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index d78a04f..91a5d3a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,35 +46,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class RemoteExecutor extends PlanExecutor {
 
-	private final List<URL> jarFiles;
-
-	private final List<URL> globalClasspaths;
-
 	private final Configuration clientConfiguration;
 
 	private int defaultParallelism = 1;
 
 	public RemoteExecutor(String hostname, int port) {
-		this(hostname, port, new Configuration(), Collections.emptyList(), Collections.emptyList());
+		this(hostname, port, new Configuration());
 	}
 
-	public RemoteExecutor(
-		String hostname,
-		int port,
-		Configuration clientConfiguration,
-		List<URL> jarFiles,
-		List<URL> globalClasspaths) {
-		this(new InetSocketAddress(hostname, port), clientConfiguration, jarFiles, globalClasspaths);
+	public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
+		this(new InetSocketAddress(hostname, port), clientConfiguration);
 	}
 
-	public RemoteExecutor(
-		InetSocketAddress inet,
-		Configuration clientConfiguration,
-		List<URL> jarFiles,
-		List<URL> globalClasspaths) {
+	public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration) {
 		this.clientConfiguration = clientConfiguration;
-		this.jarFiles = jarFiles;
-		this.globalClasspaths = globalClasspaths;
 
 		clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
 		clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
@@ -87,8 +71,8 @@ public class RemoteExecutor extends PlanExecutor {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Sets the parallelism that will be used when neither the program does not define
-	 * any parallelism at all.
+	 * Sets the parallelism that will be used when neither the program does not define any
+	 * parallelism at all.
 	 *
 	 * @param defaultParallelism The default parallelism for the executor.
 	 */
@@ -100,8 +84,8 @@ public class RemoteExecutor extends PlanExecutor {
 	}
 
 	/**
-	 * Gets the parallelism that will be used when neither the program does not define
-	 * any parallelism at all.
+	 * Gets the parallelism that will be used when neither the program does not define any
+	 * parallelism at all.
 	 *
 	 * @return The default parallelism for the executor.
 	 */
@@ -114,11 +98,13 @@ public class RemoteExecutor extends PlanExecutor {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public JobExecutionResult executePlan(Pipeline plan) throws Exception {
+	public JobExecutionResult executePlan(
+			Pipeline plan,
+			List<URL> jarFiles,
+			List<URL> globalClasspaths) throws Exception {
 		checkNotNull(plan);
 
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
+		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan,
 				clientConfiguration,
 				getDefaultParallelism());
 
@@ -127,7 +113,7 @@ public class RemoteExecutor extends PlanExecutor {
 
 		ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
 				jarFiles,
-				this.globalClasspaths,
+				globalClasspaths,
 				getClass().getClassLoader());
 
 		return executePlanWithJars(jobGraph, userCodeClassLoader);
@@ -137,7 +123,9 @@ public class RemoteExecutor extends PlanExecutor {
 		checkNotNull(jobGraph);
 		checkNotNull(classLoader);
 
-		try (ClusterClient<?>  client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) {
+		try (ClusterClient<?> client = new RestClusterClient<>(
+				clientConfiguration,
+				"RemoteExecutor")) {
 			return client.submitJob(jobGraph, classLoader).getJobExecutionResult();
 		}
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 73e99e5..187c345 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
@@ -54,7 +53,7 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 
 		RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 		try {
-			exec.executePlan(getProgram());
+			exec.executePlan(getProgram(), Collections.emptyList(), Collections.emptyList());
 			fail("This should fail with an ProgramInvocationException");
 		}
 		catch (UnknownHostException ignored) {
@@ -66,10 +65,9 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 	public void testUnresolvableHostname2() throws Exception {
 
 		InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
-		RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
-				Collections.<URL>emptyList(), Collections.<URL>emptyList());
+		RemoteExecutor exec = new RemoteExecutor(add, new Configuration());
 		try {
-			exec.executePlan(getProgram());
+			exec.executePlan(getProgram(), Collections.emptyList(), Collections.emptyList());
 			fail("This should fail with an ProgramInvocationException");
 		}
 		catch (UnknownHostException ignored) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 7c90383..2af3874 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.URL;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -59,11 +58,20 @@ public abstract class PlanExecutor {
 	 * immediately shut down after the execution.</p>
 	 *
 	 * @param plan The plan of the program to execute.
+	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes
+	 * 		and all classes used from within the UDFs.
+	 * @param globalClasspaths A list of URLs that are added to the classpath of each user code
+	 * 		classloader of the program. Paths must specify a protocol (e.g. file://) and be
+	 * 		accessible
+	 * 		on all nodes.
 	 * @return The execution result, containing for example the net runtime of the program, and the
 	 * 		accumulators.
 	 * @throws Exception Thrown, if job submission caused an exception.
 	 */
-	public abstract JobExecutionResult executePlan(Pipeline plan) throws Exception;
+	public abstract JobExecutionResult executePlan(
+			Pipeline plan,
+			List<URL> jarFiles,
+			List<URL> globalClasspaths) throws Exception;
 
 	// ------------------------------------------------------------------------
 	//  Executor Factories
@@ -93,20 +101,10 @@ public abstract class PlanExecutor {
 	 * @param hostname The address of the JobManager to send the program to.
 	 * @param port The port of the JobManager to send the program to.
 	 * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
-	 * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes
-	 * 		and all classes used from within the UDFs.
-	 * @param globalClasspaths A list of URLs that are added to the classpath of each user code
-	 * 		classloader of the program. Paths must specify a protocol (e.g. file://) and be
-	 * 		accessible
-	 * 		on all nodes.
 	 * @return A remote executor.
 	 */
 	public static PlanExecutor createRemoteExecutor(
-			String hostname,
-			int port,
-			Configuration clientConfiguration,
-			List<URL> jarFiles,
-			List<URL> globalClasspaths) {
+			String hostname, int port, Configuration clientConfiguration) {
 		if (hostname == null) {
 			throw new IllegalArgumentException("The hostname must not be null.");
 		}
@@ -116,20 +114,14 @@ public abstract class PlanExecutor {
 
 		Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
 
-		List<URL> files = (jarFiles == null) ? Collections.<URL>emptyList() : jarFiles;
-		List<URL> paths = (globalClasspaths == null) ? Collections.<URL>emptyList() :
-				globalClasspaths;
-
 		try {
-			return (clientConfiguration == null) ? reClass
-					.getConstructor(String.class, int.class, List.class)
-					.newInstance(hostname, port, files) : reClass
-					.getConstructor(String.class,
-							int.class,
-							Configuration.class,
-							List.class,
-							List.class)
-					.newInstance(hostname, port, clientConfiguration, files, paths);
+			if (clientConfiguration == null) {
+				return reClass.getConstructor(String.class, int.class).newInstance(hostname, port);
+			} else {
+				return reClass
+						.getConstructor(String.class, int.class, Configuration.class)
+						.newInstance(hostname, port, clientConfiguration);
+			}
 		} catch (Throwable t) {
 			throw new RuntimeException(
 					"An error occurred while loading the remote executor (" + REMOTE_EXECUTOR_CLASS + ").",
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 9ce0946..dc88650 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.Collections;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -73,7 +75,10 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		final Plan p = createProgramPlan(jobName);
 
 		final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
-		lastJobExecutionResult = executor.executePlan(p);
+		lastJobExecutionResult = executor.executePlan(
+				p,
+				Collections.emptyList(),
+				Collections.emptyList());
 		return lastJobExecutionResult;
 	}
 
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 786add1..9b6add7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -155,8 +155,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		final Plan p = createProgramPlan(jobName);
 
-		final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
-		lastJobExecutionResult = executor.executePlan(p);
+		final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
+		lastJobExecutionResult = executor.executePlan(p, jarFiles, globalClasspaths);
 		return lastJobExecutionResult;
 	}
 
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 6898a99..e545192 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -62,8 +62,8 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 		final Plan p = createProgramPlan(jobName);
 		final List<URL> allJarFiles = getUpdatedJarFiles();
 
-		final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, allJarFiles, globalClasspaths);
-		lastJobExecutionResult = executor.executePlan(p);
+		final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
+		lastJobExecutionResult = executor.executePlan(p, allJarFiles, globalClasspaths);
 		return lastJobExecutionResult;
 	}
 
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 5b46c54..19bc2a0 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -39,6 +39,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.net.URL;
 import java.util.List;
 
 import scala.Option;
@@ -68,18 +69,13 @@ public class FlinkILoopTest extends TestLogger {
 		BDDMockito.given(PlanExecutor.createRemoteExecutor(
 			Matchers.anyString(),
 			Matchers.anyInt(),
-			Matchers.any(Configuration.class),
-			Matchers.any(java.util.List.class),
-			Matchers.any(java.util.List.class)
+			Matchers.any(Configuration.class)
 		)).willAnswer(new Answer<PlanExecutor>() {
 			@Override
 			public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
 				testPlanExecutor.setHost((String) invocation.getArguments()[0]);
 				testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
 				testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
-				testPlanExecutor.setJars((List<String>) invocation.getArguments()[3]);
-				testPlanExecutor.setGlobalClasspaths((List<String>) invocation.getArguments()[4]);
-
 				return testPlanExecutor;
 			}
 		});
@@ -128,7 +124,8 @@ public class FlinkILoopTest extends TestLogger {
 		private List<String> globalClasspaths;
 
 		@Override
-		public JobExecutionResult executePlan(Pipeline plan) throws Exception {
+		public JobExecutionResult executePlan(
+				Pipeline plan, List<URL> jarFiles, List<URL> globalClasspaths) throws Exception {
 			return null;
 		}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 54f4354..a896cf2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import javax.annotation.Nonnull;
 
+import java.util.Collections;
+
 /**
  * The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
  * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
@@ -76,7 +78,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		try {
 			final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
-			return executor.executePlan(streamGraph);
+			return executor.executePlan(streamGraph, Collections.emptyList(), Collections.emptyList());
 		} finally {
 			transformations.clear();
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index b393e5e..1de834a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -279,10 +279,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			final PlanExecutor executor = PlanExecutor.createRemoteExecutor(
 					host,
 					port,
-					clientConfiguration,
-					jarFiles,
-					globalClasspaths);
-			return executor.executePlan(streamGraph).getJobExecutionResult();
+					clientConfiguration);
+
+			return executor.executePlan(streamGraph, jarFiles, globalClasspaths).getJobExecutionResult();
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
index af76ec9..331f6a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.util.Collections;
 
 /**
  * Integration tests for {@link LocalExecutor}.
@@ -62,7 +63,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 			Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
 			wcPlan.setExecutionConfig(new ExecutionConfig());
-			executor.executePlan(wcPlan);
+			executor.executePlan(wcPlan, Collections.emptyList(), Collections.emptyList());
 		} catch (Exception e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());


[flink] 07/15: [FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85e5a77de130f4b064d3c70671f1fd7f3fc046e2
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 19 16:42:19 2019 +0200

    [FLINK-14290] Use RemoteExecutor in RemoteStreamEnvironment
---
 .../api/environment/RemoteStreamEnvironment.java      | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 13bde96..b393e5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
@@ -225,7 +226,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	) throws ProgramInvocationException {
 		StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(jobName);
 		return executeRemotely(streamGraph,
-			streamExecutionEnvironment.getClass().getClassLoader(),
 			streamExecutionEnvironment.getConfig(),
 			jarFiles,
 			host,
@@ -242,7 +242,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 * @throws ProgramInvocationException
 	 */
 	private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
-		ClassLoader envClassLoader,
 		ExecutionConfig executionConfig,
 		List<URL> jarFiles,
 		String host,
@@ -255,8 +254,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);
-
 		Configuration configuration = new Configuration();
 		configuration.addAll(clientConfiguration);
 
@@ -274,13 +271,18 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 				streamGraph.getJobGraph().getJobID(), e);
 		}
 
-		if (savepointRestoreSettings == null) {
-			savepointRestoreSettings = SavepointRestoreSettings.none();
+		if (savepointRestoreSettings != null) {
+			streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 		}
 
 		try {
-			return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
-				.getJobExecutionResult();
+			final PlanExecutor executor = PlanExecutor.createRemoteExecutor(
+					host,
+					port,
+					clientConfiguration,
+					jarFiles,
+					globalClasspaths);
+			return executor.executePlan(streamGraph).getJobExecutionResult();
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
@@ -318,7 +320,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	@Deprecated
 	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
 		return executeRemotely(streamGraph,
-			this.getClass().getClassLoader(),
 			getConfig(),
 			jarFiles,
 			host,


[flink] 06/15: [FLINK-14290] Add SavepointRestoreSettings to StreamGraph and Generators

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14b19dd4ac9845616604bfc72de6e84506e56caa
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 19 16:37:33 2019 +0200

    [FLINK-14290] Add SavepointRestoreSettings to StreamGraph and Generators
    
    We need this to be able to set SavepointRestoreSettings on a StreamGraph
    that we want to execute via an Executor (PlanExecutor). If we don't set
    the settings on the StreamGraph we would have to pass them to the
    Executor, as we now pass them to the ClusterClient.
    
    This can make the Executor/ClusterClient unaware of this streaming-only
    setting.
---
 .../flink/streaming/api/graph/StreamGraph.java       | 13 ++++++++++++-
 .../streaming/api/graph/StreamGraphGenerator.java    |  9 ++++++++-
 .../api/graph/StreamingJobGraphGenerator.java        |  2 ++
 .../api/graph/StreamGraphGeneratorTest.java          | 18 ++++++++++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java    | 20 +++++++++++++++++++-
 5 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 522c6fc..ec6992f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateBackend;
@@ -84,6 +85,7 @@ public class StreamGraph extends StreamingPlan implements Pipeline {
 
 	private final ExecutionConfig executionConfig;
 	private final CheckpointConfig checkpointConfig;
+	private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
 
 	private ScheduleMode scheduleMode;
 
@@ -111,9 +113,10 @@ public class StreamGraph extends StreamingPlan implements Pipeline {
 	private StateBackend stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-	public StreamGraph(ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
+	public StreamGraph(ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, SavepointRestoreSettings savepointRestoreSettings) {
 		this.executionConfig = checkNotNull(executionConfig);
 		this.checkpointConfig = checkNotNull(checkpointConfig);
+		this.savepointRestoreSettings = checkNotNull(savepointRestoreSettings);
 
 		// create an empty new stream graph.
 		clear();
@@ -142,6 +145,14 @@ public class StreamGraph extends StreamingPlan implements Pipeline {
 		return checkpointConfig;
 	}
 
+	public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
+		this.savepointRestoreSettings = savepointRestoreSettings;
+	}
+
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointRestoreSettings;
+	}
+
 	public String getJobName() {
 		return jobName;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index bd6a4b5..9a4eafd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateBackend;
@@ -107,6 +108,8 @@ public class StreamGraphGenerator {
 
 	private final CheckpointConfig checkpointConfig;
 
+	private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+
 	private StateBackend stateBackend;
 
 	private boolean chaining = true;
@@ -193,8 +196,12 @@ public class StreamGraphGenerator {
 		return this;
 	}
 
+	public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
+		this.savepointRestoreSettings = savepointRestoreSettings;
+	}
+
 	public StreamGraph generate() {
-		streamGraph = new StreamGraph(executionConfig, checkpointConfig);
+		streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
 		streamGraph.setStateBackend(stateBackend);
 		streamGraph.setChaining(chaining);
 		streamGraph.setScheduleMode(scheduleMode);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 58b4989..83a8103 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -166,6 +166,8 @@ public class StreamingJobGraphGenerator {
 
 		configureCheckpointing();
 
+		jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
+
 		JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
 
 		// set the ExecutionConfig last when it has been finalized
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index e09334a..fc057e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -53,11 +55,14 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -69,6 +74,19 @@ import static org.junit.Assert.assertTrue;
 public class StreamGraphGeneratorTest {
 
 	@Test
+	public void generatorForwardsSavepointRestoreSettings() {
+		StreamGraphGenerator streamGraphGenerator =
+				new StreamGraphGenerator(Collections.emptyList(),
+				new ExecutionConfig(),
+				new CheckpointConfig());
+
+		streamGraphGenerator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("hello"));
+
+		StreamGraph streamGraph = streamGraphGenerator.generate();
+		assertThat(streamGraph.getSavepointRestoreSettings().getRestorePath(), is("hello"));
+	}
+
+	@Test
 	public void testBufferTimeout() {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 1334d28..18b25f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -47,6 +49,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
@@ -68,10 +71,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -137,7 +142,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testDisabledCheckpointing() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamGraph streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig());
+		StreamGraph streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig(), SavepointRestoreSettings.none());
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
@@ -146,6 +151,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
 	}
 
+	@Test
+	public void generatorForwardsSavepointRestoreSettings() {
+		StreamGraph streamGraph = new StreamGraph(
+				new ExecutionConfig(),
+				new CheckpointConfig(),
+				SavepointRestoreSettings.forPath("hello"));
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+		assertThat(savepointRestoreSettings.getRestorePath(), is("hello"));
+	}
+
 	/**
 	 * Verifies that the chain start/end is correctly set.
 	 */


[flink] 02/15: [FLINK-14290] Add Pipeline translation utils for getting a JobGraph from a FlinkPipeline

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8092dafa05f1dea3cbb63888649362ca62573f24
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Wed Sep 11 17:37:33 2019 +0200

    [FLINK-14290] Add Pipeline translation utils for getting a JobGraph from a FlinkPipeline
    
    For now, we can only translate DataSet Plans, in the future we also need
    this for DataStream StreamGraps.
---
 .../flink/client/FlinkPipelineTranslationUtil.java | 52 ++++++++++++++
 .../flink/client/FlinkPipelineTranslator.java      | 42 +++++++++++
 .../org/apache/flink/client/PlanTranslator.java    | 82 ++++++++++++++++++++++
 3 files changed, 176 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
new file mode 100644
index 0000000..61bda14
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Utility for transforming {@link Pipeline FlinkPipelines} into a {@link JobGraph}. This uses
+ * reflection or service discovery to find the right {@link FlinkPipelineTranslator} for a given
+ * subclass of {@link Pipeline}.
+ */
+public final class FlinkPipelineTranslationUtil {
+
+	/**
+	 * Transmogrifies the given {@link Pipeline} to a {@link JobGraph}.
+	 */
+	public static JobGraph getJobGraph(
+			Pipeline pipeline,
+			Configuration optimizerConfiguration,
+			int defaultParallelism) {
+
+		PlanTranslator planToJobGraphTransmogrifier = new PlanTranslator();
+
+		if (planToJobGraphTransmogrifier.canTranslate(pipeline)) {
+			// we have a DataSet program
+			return planToJobGraphTransmogrifier.translate(pipeline,
+					optimizerConfiguration,
+					defaultParallelism);
+		}
+
+		throw new RuntimeException("Cannot find transmogrifier for given pipeline: " + pipeline);
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java
new file mode 100644
index 0000000..c224422
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * This can be used to turn a {@link Pipeline} into a {@link JobGraph}. There will be
+ * implementations for the different pipeline APIs that Flink supports.
+ */
+public interface FlinkPipelineTranslator {
+
+	/**
+	 * Creates a {@link JobGraph} from the given {@link Pipeline} and attaches the given jar
+	 * files and classpaths to the {@link JobGraph}.
+	 */
+	JobGraph translate(
+			Pipeline pipeline,
+			Configuration optimizerConfiguration,
+			int defaultParallelism);
+
+	boolean canTranslate(Pipeline pipeline);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
new file mode 100644
index 0000000..ae18b6b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/PlanTranslator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * {@link FlinkPipelineTranslator} for DataSet API {@link Plan Plans}.
+ */
+public class PlanTranslator implements FlinkPipelineTranslator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PlanTranslator.class);
+
+	@Override
+	public JobGraph translate(
+			Pipeline pipeline,
+			Configuration optimizerConfiguration,
+			int defaultParallelism) {
+		checkArgument(pipeline instanceof Plan, "Given pipeline is not a DataSet Plan.");
+
+		Plan plan = (Plan) pipeline;
+		setDefaultParallelism(plan, defaultParallelism);
+		return compilePlan(plan, optimizerConfiguration);
+	}
+
+	private void setDefaultParallelism(Plan plan, int defaultParallelism) {
+		if (defaultParallelism > 0 && plan.getDefaultParallelism() <= 0) {
+			LOG.debug(
+					"Changing plan default parallelism from {} to {}",
+					plan.getDefaultParallelism(),
+					defaultParallelism);
+			plan.setDefaultParallelism(defaultParallelism);
+		}
+
+		LOG.debug(
+				"Set parallelism {}, plan default parallelism {}",
+				defaultParallelism,
+				plan.getDefaultParallelism());
+	}
+
+	private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
+		Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
+		OptimizedPlan optimizedPlan = optimizer.compile(plan);
+
+		JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
+		return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
+	}
+
+	@Override
+	public boolean canTranslate(Pipeline pipeline) {
+		return pipeline instanceof Plan;
+	}
+}


[flink] 12/15: [FLINK-14391] Add JobID setter in JobGraph

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70a8e0c407a21174f608adf8ff1d4254d9024490
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri Oct 11 14:08:10 2019 +0200

    [FLINK-14391] Add JobID setter in JobGraph
    
    This allows setting the JobID after creation of the JobGraph.
---
 .../main/java/org/apache/flink/runtime/jobgraph/JobGraph.java    | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 377f870..3c6f935 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -68,7 +68,7 @@ public class JobGraph implements Serializable {
 	private final Configuration jobConfiguration = new Configuration();
 
 	/** ID of this job. May be set if specific job id is desired (e.g. session management) */
-	private final JobID jobID;
+	private JobID jobID;
 
 	/** Name of this job. */
 	private final String jobName;
@@ -190,6 +190,13 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Sets the ID of the job.
+	 */
+	public void setJobID(JobID jobID) {
+		this.jobID = jobID;
+	}
+
+	/**
 	 * Returns the name assigned to the job graph.
 	 *
 	 * @return the name assigned to the job graph


[flink] 11/15: [FLINK-14391] Factor out translator discovery in FlinkPipelineTranslationUtil

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98b54e0d9d2bd3ade8ca10d2a70fb5cb7a20a4f7
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Oct 10 17:18:20 2019 +0200

    [FLINK-14391] Factor out translator discovery in FlinkPipelineTranslationUtil
    
    This way, we can reuse the discovery from different methods that we'll
    add in follow-up commits.
---
 .../flink/client/FlinkPipelineTranslationUtil.java     | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
index 33c8027..e54a428 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -38,13 +38,18 @@ public final class FlinkPipelineTranslationUtil {
 			Configuration optimizerConfiguration,
 			int defaultParallelism) {
 
+		FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+
+		return pipelineTranslator.translate(pipeline,
+				optimizerConfiguration,
+				defaultParallelism);
+	}
+
+	private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
 		PlanTranslator planToJobGraphTransmogrifier = new PlanTranslator();
 
 		if (planToJobGraphTransmogrifier.canTranslate(pipeline)) {
-			// we have a DataSet program
-			return planToJobGraphTransmogrifier.translate(pipeline,
-					optimizerConfiguration,
-					defaultParallelism);
+			return planToJobGraphTransmogrifier;
 		}
 
 		FlinkPipelineTranslator streamGraphTranslator = reflectStreamGraphTranslator();
@@ -53,10 +58,7 @@ public final class FlinkPipelineTranslationUtil {
 			throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
 					+ "the given pipeline " + pipeline + ".");
 		}
-
-		return streamGraphTranslator.translate(pipeline,
-				optimizerConfiguration,
-				defaultParallelism);
+		return streamGraphTranslator;
 	}
 
 	private static FlinkPipelineTranslator reflectStreamGraphTranslator() {