You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/12 07:58:54 UTC

[flink] 14/24: [FLINK-XXXXX] Wiring the executors to the context envs

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d021a1e4559ddb313cfcf98353a626539b9553a
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 5 13:54:51 2019 +0100

    [FLINK-XXXXX] Wiring the executors to the context envs
---
 .../flink/client/program/ContextEnvironment.java   | 99 ++++++----------------
 .../client/program/ContextEnvironmentFactory.java  | 61 ++++---------
 .../api/environment/StreamContextEnvironment.java  | 25 +-----
 3 files changed, 44 insertions(+), 141 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..b43ac15 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,85 +21,58 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private final ClusterClient<?> client;
-
-	private final boolean detached;
-
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final ClassLoader userCodeClassLoader;
-
-	private final SavepointRestoreSettings savepointSettings;
+	private final Configuration configuration;
 
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironment(
-		ClusterClient<?> remoteConnection,
-		List<URL> jarFiles,
-		List<URL> classpaths,
-		ClassLoader userCodeClassLoader,
-		SavepointRestoreSettings savepointSettings,
-		boolean detached,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = remoteConnection;
-		this.jarFilesToAttach = jarFiles;
-		this.classpathsToAttach = classpaths;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointSettings = savepointSettings;
-
-		this.detached = detached;
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration configuration,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		super(executorServiceLoader, configuration);
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
+		this.configuration = checkNotNull(configuration);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
 		this.alreadyCalled = false;
+	}
 
-		this.jobExecutionResult = jobExecutionResult;
+	public ExecutorServiceLoader getExecutorServiceLoader() {
+		return executorServiceLoader;
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
-
-		Plan plan = createProgramPlan(jobName);
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
-				client.getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(this.jarFilesToAttach);
-		jobGraph.setClasspaths(this.classpathsToAttach);
-
-		if (detached) {
-			lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
-		} else {
-			lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
-		}
-
+		lastJobExecutionResult = super.execute(jobName);
 		setJobExecutionResult(lastJobExecutionResult);
-
 		return lastJobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-		if (alreadyCalled && detached) {
+		if (alreadyCalled && !configuration.getBoolean(ExecutionOptions.ATTACHED)) {
 			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
 		}
 		alreadyCalled = true;
@@ -114,30 +87,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
 	}
 
-	public ClusterClient<?> getClient() {
-		return this.client;
-	}
-
-	public List<URL> getJars(){
-		return jarFilesToAttach;
-	}
-
-	public List<URL> getClasspaths(){
-		return classpathsToAttach;
-	}
-
-	public ClassLoader getUserCodeClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public SavepointRestoreSettings getSavepointRestoreSettings() {
-		return savepointSettings;
-	}
-
-	public boolean isDetached() {
-		return detached;
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	public static void setAsContext(ContextEnvironmentFactory factory) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..3ff81cd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,15 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs that are
  * submitted through a pre-configured client connection.
@@ -35,64 +38,36 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient<?> client;
-
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-
-	private final ClassLoader userCodeClassLoader;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final int defaultParallelism;
-
-	private final boolean isDetached;
-
-	private final SavepointRestoreSettings savepointSettings;
+	private final Configuration configuration;
 
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
-		ClusterClient<?> client,
-		List<URL> jarFilesToAttach,
-		List<URL> classpathsToAttach,
-		ClassLoader userCodeClassLoader,
-		int defaultParallelism,
-		boolean isDetached,
-		SavepointRestoreSettings savepointSettings,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = client;
-		this.jarFilesToAttach = jarFilesToAttach;
-		this.classpathsToAttach = classpathsToAttach;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.defaultParallelism = defaultParallelism;
-		this.isDetached = isDetached;
-		this.savepointSettings = savepointSettings;
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration configuration,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
+		this.configuration = checkNotNull(configuration);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
 		this.alreadyCalled = false;
-		this.jobExecutionResult = jobExecutionResult;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
 
-		final ContextEnvironment environment = new ContextEnvironment(
-			client,
-			jarFilesToAttach,
-			classpathsToAttach,
-			userCodeClassLoader,
-			savepointSettings,
-			isDetached,
-			jobExecutionResult);
-		if (defaultParallelism > 0) {
-			environment.setParallelism(defaultParallelism);
-		}
+		final ContextEnvironment environment = new ContextEnvironment(executorServiceLoader, configuration, jobExecutionResult);
+		environment.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)); // TODO: 05.11.19 check this
 		return environment;
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-		if (isDetached && alreadyCalled) {
+		if (!configuration.getBoolean(ExecutionOptions.ATTACHED) && alreadyCalled) {
 			throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
 		}
 		alreadyCalled = true;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..a0a53cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,10 +19,7 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 /**
@@ -36,6 +33,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	private final ContextEnvironment ctx;
 
 	protected StreamContextEnvironment(ContextEnvironment ctx) {
+		super(ctx.getExecutorServiceLoader(), ctx.getConfiguration());
 		this.ctx = ctx;
 		if (ctx.getParallelism() > 0) {
 			setParallelism(ctx.getParallelism());
@@ -45,27 +43,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				streamGraph,
-				ctx.getClient().getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(ctx.getJars());
-		jobGraph.setClasspaths(ctx.getClasspaths());
-
-		// running from the CLI will override the savepoint restore settings
-		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-		JobExecutionResult jobExecutionResult;
-		if (ctx.isDetached()) {
-			jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
-		} else {
-			jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
-		}
-
+		JobExecutionResult jobExecutionResult = super.execute(streamGraph);
 		ctx.setJobExecutionResult(jobExecutionResult);
-
 		return jobExecutionResult;
 	}
 }