You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:01 UTC
[flink] 05/19: [hotfix] Simplified the construction of the
ContextEnvironment
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 325b49fd368adb63097b165782d07488be705103
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 16:46:35 2019 +0100
[hotfix] Simplified the construction of the ContextEnvironment
---
.../java/org/apache/flink/client/ClientUtils.java | 18 ++-----
.../flink/client/program/ContextEnvironment.java | 42 ++++++++++------
.../client/program/ContextEnvironmentFactory.java | 58 +++++++---------------
3 files changed, 49 insertions(+), 69 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..2c80236 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
@@ -154,9 +153,6 @@ public enum ClientUtils {
final List<URL> jobJars = executionConfigAccessor.getJars();
final List<URL> classpaths = executionConfigAccessor.getClasspaths();
- final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
- final int parallelism = executionConfigAccessor.getParallelism();
- final boolean detached = executionConfigAccessor.getDetachedMode();
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
@@ -164,19 +160,15 @@ public enum ClientUtils {
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
- LOG.info("Starting program (detached: {})", detached);
+ LOG.info("Starting program (detached: {})", executionConfigAccessor.getDetachedMode());
final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
- client,
- jobJars,
- classpaths,
- userCodeClassLoader,
- parallelism,
- detached,
- savepointSettings,
- jobExecutionResult);
+ configuration,
+ client,
+ userCodeClassLoader,
+ jobExecutionResult);
ContextEnvironment.setAsContext(factory);
try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9d3927a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -32,6 +34,8 @@ import java.net.URL;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Execution Environment for remote execution with the Client.
*/
@@ -54,23 +58,29 @@ public class ContextEnvironment extends ExecutionEnvironment {
private boolean alreadyCalled;
public ContextEnvironment(
- ClusterClient<?> remoteConnection,
- List<URL> jarFiles,
- List<URL> classpaths,
- ClassLoader userCodeClassLoader,
- SavepointRestoreSettings savepointSettings,
- boolean detached,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = remoteConnection;
- this.jarFilesToAttach = jarFiles;
- this.classpathsToAttach = classpaths;
- this.userCodeClassLoader = userCodeClassLoader;
- this.savepointSettings = savepointSettings;
-
- this.detached = detached;
- this.alreadyCalled = false;
+ final Configuration configuration,
+ final ClusterClient<?> remoteConnection,
+ final ClassLoader userCodeClassLoader,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+ final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+ .fromConfiguration(checkNotNull(configuration));
+
+ this.jarFilesToAttach = accessor.getJars();
+ this.classpathsToAttach = accessor.getClasspaths();
+ this.savepointSettings = accessor.getSavepointRestoreSettings();
+ this.detached = accessor.getDetachedMode();
+
+ final int parallelism = accessor.getParallelism();
+ if (parallelism > 0) {
+ setParallelism(parallelism);
+ }
- this.jobExecutionResult = jobExecutionResult;
+ this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
+ this.client = checkNotNull(remoteConnection);
+
+ this.alreadyCalled = false;
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..f1c9ad6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
- private final ClusterClient<?> client;
-
- private final List<URL> jarFilesToAttach;
+ private final Configuration configuration;
- private final List<URL> classpathsToAttach;
+ private final ClusterClient<?> client;
private final ClassLoader userCodeClassLoader;
- private final int defaultParallelism;
-
- private final boolean isDetached;
-
- private final SavepointRestoreSettings savepointSettings;
-
private final AtomicReference<JobExecutionResult> jobExecutionResult;
private boolean alreadyCalled;
public ContextEnvironmentFactory(
- ClusterClient<?> client,
- List<URL> jarFilesToAttach,
- List<URL> classpathsToAttach,
- ClassLoader userCodeClassLoader,
- int defaultParallelism,
- boolean isDetached,
- SavepointRestoreSettings savepointSettings,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = client;
- this.jarFilesToAttach = jarFilesToAttach;
- this.classpathsToAttach = classpathsToAttach;
- this.userCodeClassLoader = userCodeClassLoader;
- this.defaultParallelism = defaultParallelism;
- this.isDetached = isDetached;
- this.savepointSettings = savepointSettings;
+ final Configuration configuration,
+ final ClusterClient<?> client,
+ final ClassLoader userCodeClassLoader,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+ this.configuration = checkNotNull(configuration);
+ this.client = checkNotNull(client);
+ this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
this.alreadyCalled = false;
- this.jobExecutionResult = jobExecutionResult;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
verifyCreateIsCalledOnceWhenInDetachedMode();
-
- final ContextEnvironment environment = new ContextEnvironment(
+ return new ContextEnvironment(
+ configuration,
client,
- jarFilesToAttach,
- classpathsToAttach,
userCodeClassLoader,
- savepointSettings,
- isDetached,
jobExecutionResult);
- if (defaultParallelism > 0) {
- environment.setParallelism(defaultParallelism);
- }
- return environment;
}
private void verifyCreateIsCalledOnceWhenInDetachedMode() {
- if (isDetached && alreadyCalled) {
+ if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
}
alreadyCalled = true;