You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:01 UTC

[flink] 05/19: [hotfix] Simplified the construction of the ContextEnvironment

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 325b49fd368adb63097b165782d07488be705103
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 16:46:35 2019 +0100

    [hotfix] Simplified the construction of the ContextEnvironment
---
 .../java/org/apache/flink/client/ClientUtils.java  | 18 ++-----
 .../flink/client/program/ContextEnvironment.java   | 42 ++++++++++------
 .../client/program/ContextEnvironmentFactory.java  | 58 +++++++---------------
 3 files changed, 49 insertions(+), 69 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..2c80236 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -154,9 +153,6 @@ public enum ClientUtils {
 
 		final List<URL> jobJars = executionConfigAccessor.getJars();
 		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-		final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
-		final int parallelism = executionConfigAccessor.getParallelism();
-		final boolean detached = executionConfigAccessor.getDetachedMode();
 
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
@@ -164,19 +160,15 @@ public enum ClientUtils {
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
-			LOG.info("Starting program (detached: {})", detached);
+			LOG.info("Starting program (detached: {})", executionConfigAccessor.getDetachedMode());
 
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
-				client,
-				jobJars,
-				classpaths,
-				userCodeClassLoader,
-				parallelism,
-				detached,
-				savepointSettings,
-				jobExecutionResult);
+					configuration,
+					client,
+					userCodeClassLoader,
+					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
 
 			try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9d3927a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
@@ -32,6 +34,8 @@ import java.net.URL;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -54,23 +58,29 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private boolean alreadyCalled;
 
 	public ContextEnvironment(
-		ClusterClient<?> remoteConnection,
-		List<URL> jarFiles,
-		List<URL> classpaths,
-		ClassLoader userCodeClassLoader,
-		SavepointRestoreSettings savepointSettings,
-		boolean detached,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = remoteConnection;
-		this.jarFilesToAttach = jarFiles;
-		this.classpathsToAttach = classpaths;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointSettings = savepointSettings;
-
-		this.detached = detached;
-		this.alreadyCalled = false;
+			final Configuration configuration,
+			final ClusterClient<?> remoteConnection,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+				.fromConfiguration(checkNotNull(configuration));
+
+		this.jarFilesToAttach = accessor.getJars();
+		this.classpathsToAttach = accessor.getClasspaths();
+		this.savepointSettings = accessor.getSavepointRestoreSettings();
+		this.detached = accessor.getDetachedMode();
+
+		final int parallelism = accessor.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
+		}
 
-		this.jobExecutionResult = jobExecutionResult;
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+		this.client = checkNotNull(remoteConnection);
+
+		this.alreadyCalled = false;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..f1c9ad6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs that are
  * submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient<?> client;
-
-	private final List<URL> jarFilesToAttach;
+	private final Configuration configuration;
 
-	private final List<URL> classpathsToAttach;
+	private final ClusterClient<?> client;
 
 	private final ClassLoader userCodeClassLoader;
 
-	private final int defaultParallelism;
-
-	private final boolean isDetached;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
-		ClusterClient<?> client,
-		List<URL> jarFilesToAttach,
-		List<URL> classpathsToAttach,
-		ClassLoader userCodeClassLoader,
-		int defaultParallelism,
-		boolean isDetached,
-		SavepointRestoreSettings savepointSettings,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = client;
-		this.jarFilesToAttach = jarFilesToAttach;
-		this.classpathsToAttach = classpathsToAttach;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.defaultParallelism = defaultParallelism;
-		this.isDetached = isDetached;
-		this.savepointSettings = savepointSettings;
+			final Configuration configuration,
+			final ClusterClient<?> client,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		this.configuration = checkNotNull(configuration);
+		this.client = checkNotNull(client);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 		this.alreadyCalled = false;
-		this.jobExecutionResult = jobExecutionResult;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
-
-		final ContextEnvironment environment = new ContextEnvironment(
+		return new ContextEnvironment(
+			configuration,
 			client,
-			jarFilesToAttach,
-			classpathsToAttach,
 			userCodeClassLoader,
-			savepointSettings,
-			isDetached,
 			jobExecutionResult);
-		if (defaultParallelism > 0) {
-			environment.setParallelism(defaultParallelism);
-		}
-		return environment;
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-		if (isDetached && alreadyCalled) {
+		if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
 			throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
 		}
 		alreadyCalled = true;