You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/12 07:58:54 UTC
[flink] 14/24: [FLINK-XXXXX] Wiring the executors to the context
envs
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1d021a1e4559ddb313cfcf98353a626539b9553a
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 5 13:54:51 2019 +0100
[FLINK-XXXXX] Wiring the executors to the context envs
---
.../flink/client/program/ContextEnvironment.java | 99 ++++++----------------
.../client/program/ContextEnvironmentFactory.java | 61 ++++---------
.../api/environment/StreamContextEnvironment.java | 25 +-----
3 files changed, 44 insertions(+), 141 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..b43ac15 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,85 +21,58 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Execution Environment for remote execution with the Client.
*/
public class ContextEnvironment extends ExecutionEnvironment {
- private final ClusterClient<?> client;
-
- private final boolean detached;
-
- private final List<URL> jarFilesToAttach;
-
- private final List<URL> classpathsToAttach;
+ private final ExecutorServiceLoader executorServiceLoader;
- private final ClassLoader userCodeClassLoader;
-
- private final SavepointRestoreSettings savepointSettings;
+ private final Configuration configuration;
private final AtomicReference<JobExecutionResult> jobExecutionResult;
private boolean alreadyCalled;
public ContextEnvironment(
- ClusterClient<?> remoteConnection,
- List<URL> jarFiles,
- List<URL> classpaths,
- ClassLoader userCodeClassLoader,
- SavepointRestoreSettings savepointSettings,
- boolean detached,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = remoteConnection;
- this.jarFilesToAttach = jarFiles;
- this.classpathsToAttach = classpaths;
- this.userCodeClassLoader = userCodeClassLoader;
- this.savepointSettings = savepointSettings;
-
- this.detached = detached;
+ final ExecutorServiceLoader executorServiceLoader,
+ final Configuration configuration,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+ super(executorServiceLoader, configuration);
+ this.executorServiceLoader = checkNotNull(executorServiceLoader);
+ this.configuration = checkNotNull(configuration);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
this.alreadyCalled = false;
+ }
- this.jobExecutionResult = jobExecutionResult;
+ public ExecutorServiceLoader getExecutorServiceLoader() {
+ return executorServiceLoader;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
verifyExecuteIsCalledOnceWhenInDetachedMode();
-
- Plan plan = createProgramPlan(jobName);
-
- JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
- plan,
- client.getFlinkConfiguration(),
- getParallelism());
-
- jobGraph.addJars(this.jarFilesToAttach);
- jobGraph.setClasspaths(this.classpathsToAttach);
-
- if (detached) {
- lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
- } else {
- lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
- }
-
+ lastJobExecutionResult = super.execute(jobName);
setJobExecutionResult(lastJobExecutionResult);
-
return lastJobExecutionResult;
}
private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
- if (alreadyCalled && detached) {
+ if (alreadyCalled && !configuration.getBoolean(ExecutionOptions.ATTACHED)) {
throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
}
alreadyCalled = true;
@@ -114,30 +87,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
}
- public ClusterClient<?> getClient() {
- return this.client;
- }
-
- public List<URL> getJars(){
- return jarFilesToAttach;
- }
-
- public List<URL> getClasspaths(){
- return classpathsToAttach;
- }
-
- public ClassLoader getUserCodeClassLoader() {
- return userCodeClassLoader;
- }
-
- public SavepointRestoreSettings getSavepointRestoreSettings() {
- return savepointSettings;
- }
-
- public boolean isDetached() {
- return detached;
- }
-
// --------------------------------------------------------------------------------------------
public static void setAsContext(ContextEnvironmentFactory factory) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..3ff81cd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,15 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
@@ -35,64 +38,36 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
- private final ClusterClient<?> client;
-
- private final List<URL> jarFilesToAttach;
-
- private final List<URL> classpathsToAttach;
-
- private final ClassLoader userCodeClassLoader;
+ private final ExecutorServiceLoader executorServiceLoader;
- private final int defaultParallelism;
-
- private final boolean isDetached;
-
- private final SavepointRestoreSettings savepointSettings;
+ private final Configuration configuration;
private final AtomicReference<JobExecutionResult> jobExecutionResult;
private boolean alreadyCalled;
public ContextEnvironmentFactory(
- ClusterClient<?> client,
- List<URL> jarFilesToAttach,
- List<URL> classpathsToAttach,
- ClassLoader userCodeClassLoader,
- int defaultParallelism,
- boolean isDetached,
- SavepointRestoreSettings savepointSettings,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = client;
- this.jarFilesToAttach = jarFilesToAttach;
- this.classpathsToAttach = classpathsToAttach;
- this.userCodeClassLoader = userCodeClassLoader;
- this.defaultParallelism = defaultParallelism;
- this.isDetached = isDetached;
- this.savepointSettings = savepointSettings;
+ final ExecutorServiceLoader executorServiceLoader,
+ final Configuration configuration,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+ this.executorServiceLoader = checkNotNull(executorServiceLoader);
+ this.configuration = checkNotNull(configuration);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
this.alreadyCalled = false;
- this.jobExecutionResult = jobExecutionResult;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
verifyCreateIsCalledOnceWhenInDetachedMode();
- final ContextEnvironment environment = new ContextEnvironment(
- client,
- jarFilesToAttach,
- classpathsToAttach,
- userCodeClassLoader,
- savepointSettings,
- isDetached,
- jobExecutionResult);
- if (defaultParallelism > 0) {
- environment.setParallelism(defaultParallelism);
- }
+ final ContextEnvironment environment = new ContextEnvironment(executorServiceLoader, configuration, jobExecutionResult);
+ environment.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)); // TODO: 05.11.19 check this
return environment;
}
private void verifyCreateIsCalledOnceWhenInDetachedMode() {
- if (isDetached && alreadyCalled) {
+ if (!configuration.getBoolean(ExecutionOptions.ATTACHED) && alreadyCalled) {
throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
}
alreadyCalled = true;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..a0a53cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,10 +19,7 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
@@ -36,6 +33,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private final ContextEnvironment ctx;
protected StreamContextEnvironment(ContextEnvironment ctx) {
+ super(ctx.getExecutorServiceLoader(), ctx.getConfiguration());
this.ctx = ctx;
if (ctx.getParallelism() > 0) {
setParallelism(ctx.getParallelism());
@@ -45,27 +43,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
-
- JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
- streamGraph,
- ctx.getClient().getFlinkConfiguration(),
- getParallelism());
-
- jobGraph.addJars(ctx.getJars());
- jobGraph.setClasspaths(ctx.getClasspaths());
-
- // running from the CLI will override the savepoint restore settings
- jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
- JobExecutionResult jobExecutionResult;
- if (ctx.isDetached()) {
- jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
- } else {
- jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
- }
-
+ JobExecutionResult jobExecutionResult = super.execute(streamGraph);
ctx.setJobExecutionResult(jobExecutionResult);
-
return jobExecutionResult;
}
}