You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 04:42:23 UTC
[flink] 12/15: [FLINK-XXXXX] Refactoring the ContextEnvironments
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 051b9edc4b555e47093644ad10ec1c6ec756e14e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 20:58:57 2019 +0100
[FLINK-XXXXX] Refactoring the ContextEnvironments
---
.../java/org/apache/flink/client/ClientUtils.java | 6 +-
.../client/cli/AbstractCustomCommandLine.java | 3 +-
.../org/apache/flink/client/cli/CliFrontend.java | 10 +--
.../flink/client/program/ContextEnvironment.java | 92 ++++------------------
.../client/program/ContextEnvironmentFactory.java | 11 +--
.../flink/client/cli/CliFrontendRunTest.java | 3 +-
.../apache/flink/client/program/ClientTest.java | 67 ++++++++++++++--
.../flink/api/java/ExecutionEnvironment.java | 10 ++-
.../api/environment/StreamContextEnvironment.java | 36 +++------
.../environment/StreamExecutionEnvironment.java | 4 +
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 6 +-
11 files changed, 122 insertions(+), 126 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 1654dff..f599478 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -145,10 +146,11 @@ public enum ClientUtils {
}
public static JobSubmissionResult executeProgram(
+ ExecutorServiceLoader executorServiceLoader,
Configuration configuration,
- ClusterClient<?> client,
PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+ checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -160,8 +162,8 @@ public enum ClientUtils {
final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
+ executorServiceLoader,
configuration,
- client,
userCodeClassLoader,
jobExecutionResult);
ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.cli;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
@Override
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
final Configuration resultingConfiguration = new Configuration(configuration);
- resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
+ resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
if (commandLine.hasOption(addressOption.getOpt())) {
String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index d82b377..e8f8179 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,7 @@ public class CliFrontend {
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- executeProgram(configuration, program, client);
+ executeProgram(configuration, program);
} finally {
if (clusterId == null && !executionParameters.getDetachedMode()) {
// terminate the cluster only if we have started it before and if it's not detached
@@ -750,12 +751,11 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected void executeProgram(
- Configuration configuration,
- PackagedProgram program,
- ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
+ final Configuration configuration,
+ final PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");
- JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
+ JobSubmissionResult result = ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9a03271..4561da4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,44 +36,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ContextEnvironment extends ExecutionEnvironment {
- private final ClusterClient<?> client;
-
- private final boolean detached;
-
- private final List<URL> jarFilesToAttach;
-
- private final List<URL> classpathsToAttach;
-
- private final ClassLoader userCodeClassLoader;
-
- private final SavepointRestoreSettings savepointSettings;
-
private final AtomicReference<JobExecutionResult> jobExecutionResult;
private boolean alreadyCalled;
- public ContextEnvironment(
+ ContextEnvironment(
+ final ExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
- final ClusterClient<?> remoteConnection,
final ClassLoader userCodeClassLoader,
final AtomicReference<JobExecutionResult> jobExecutionResult) {
-
- this.client = checkNotNull(remoteConnection);
- this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+ super(executorServiceLoader, configuration);
this.jobExecutionResult = checkNotNull(jobExecutionResult);
- final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
- .fromConfiguration(checkNotNull(configuration));
+ setUserClassloader(checkNotNull(userCodeClassLoader));
- if (accessor.getParallelism() > 0) {
- setParallelism(accessor.getParallelism());
+ final int parallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+ if (parallelism > 0) {
+ setParallelism(parallelism);
}
- this.jarFilesToAttach = accessor.getJars();
- this.classpathsToAttach = accessor.getClasspaths();
- this.savepointSettings = accessor.getSavepointRestoreSettings();
- this.detached = accessor.getDetachedMode();
-
this.alreadyCalled = false;
}
@@ -86,29 +62,13 @@ public class ContextEnvironment extends ExecutionEnvironment {
public JobExecutionResult execute(String jobName) throws Exception {
verifyExecuteIsCalledOnceWhenInDetachedMode();
- Plan plan = createProgramPlan(jobName);
-
- JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
- plan,
- client.getFlinkConfiguration(),
- getParallelism());
-
- jobGraph.addJars(this.jarFilesToAttach);
- jobGraph.setClasspaths(this.classpathsToAttach);
-
- if (detached) {
- lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
- } else {
- lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
- }
-
- setJobExecutionResult(lastJobExecutionResult);
-
- return lastJobExecutionResult;
+ final JobExecutionResult jobExecutionResult = super.execute(jobName);
+ setJobExecutionResult(jobExecutionResult);
+ return jobExecutionResult;
}
private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
- if (alreadyCalled && detached) {
+ if (alreadyCalled && !getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
}
alreadyCalled = true;
@@ -123,30 +83,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
}
- public ClusterClient<?> getClient() {
- return this.client;
- }
-
- public List<URL> getJars(){
- return jarFilesToAttach;
- }
-
- public List<URL> getClasspaths(){
- return classpathsToAttach;
- }
-
- public ClassLoader getUserCodeClassLoader() {
- return userCodeClassLoader;
- }
-
- public SavepointRestoreSettings getSavepointRestoreSettings() {
- return savepointSettings;
- }
-
- public boolean isDetached() {
- return detached;
- }
-
// --------------------------------------------------------------------------------------------
public static void setAsContext(ContextEnvironmentFactory factory) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ab589f2..1092c53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,9 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
- private final Configuration configuration;
+ private final ExecutorServiceLoader executorServiceLoader;
- private final ClusterClient<?> client;
+ private final Configuration configuration;
private final ClassLoader userCodeClassLoader;
@@ -47,12 +48,12 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
private boolean alreadyCalled;
public ContextEnvironmentFactory(
+ final ExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
- final ClusterClient<?> client,
final ClassLoader userCodeClassLoader,
final AtomicReference<JobExecutionResult> jobExecutionResult) {
+ this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(configuration);
- this.client = checkNotNull(client);
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
this.jobExecutionResult = checkNotNull(jobExecutionResult);
@@ -63,8 +64,8 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
public ExecutionEnvironment createExecutionEnvironment() {
verifyCreateIsCalledOnceWhenInDetachedMode();
return new ContextEnvironment(
+ executorServiceLoader,
configuration,
- client,
userCodeClassLoader,
jobExecutionResult);
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..f6c536e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
}
@Override
- protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+ protected void executeProgram(Configuration configuration, PackagedProgram program) {
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..1e73ac9f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
@@ -30,6 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.JobClientImpl;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
@@ -37,6 +40,9 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -56,9 +62,13 @@ import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.annotation.Nonnull;
+
import java.net.URL;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -80,6 +90,8 @@ public class ClientTest extends TestLogger {
private Configuration config;
+ private static final String TEST_EXECUTOR_NAME = "test_executor";
+
private static final String ACCUMULATOR_NAME = "test_accumulator";
private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
@@ -100,6 +112,7 @@ public class ClientTest extends TestLogger {
private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) {
final Configuration configuration = new Configuration();
+ configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
configuration.set(DeploymentOptions.ATTACHED, !detached);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
@@ -116,7 +129,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);
- ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -127,7 +141,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);
- ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -138,7 +153,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);
- ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -149,7 +165,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);
- ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -160,7 +177,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);
- ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -209,7 +227,7 @@ public class ClientTest extends TestLogger {
try {
final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
- ClientUtils.executeProgram(configuration, client, packagedProgramMock);
+ ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock);
fail("Creating the local execution environment should not be possible");
}
catch (InvalidProgramException e) {
@@ -346,4 +364,41 @@ public class ClientTest extends TestLogger {
env.execute().getAllAccumulatorResults();
}
}
+
+ private static final class TestExecutorServiceLoader implements ExecutorServiceLoader {
+
+ private final ClusterClient<?> clusterClient;
+
+ private final Plan plan;
+
+ TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) {
+ this.clusterClient = checkNotNull(clusterClient);
+ this.plan = checkNotNull(plan);
+ }
+
+ @Override
+ public ExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
+ return new ExecutorFactory() {
+ @Override
+ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
+ return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+ }
+
+ @Override
+ public Executor getExecutor(@Nonnull Configuration configuration) {
+ return (pipeline, config) -> {
+ final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+ final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
+
+ final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
+ jobGraph.addJars(accessor.getJars());
+ jobGraph.setClasspaths(accessor.getClasspaths());
+
+ final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+ return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobID));
+ };
+ }
+ };
+ }
+ }
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6e64788..673af75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -156,7 +156,15 @@ public class ExecutionEnvironment {
this.userClassloader = checkNotNull(userClassloader);
}
- protected Configuration getConfiguration() {
+ public ClassLoader getUserCodeClassLoader() {
+ return userClassloader;
+ }
+
+ public ExecutorServiceLoader getExecutorServiceLoader() {
+ return executorServiceLoader;
+ }
+
+ public Configuration getConfiguration() {
return this.configuration;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..3cdb483 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
* testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -35,37 +34,24 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private final ContextEnvironment ctx;
- protected StreamContextEnvironment(ContextEnvironment ctx) {
+ StreamContextEnvironment(final ContextEnvironment ctx) {
+ super(checkNotNull(ctx).getExecutorServiceLoader(), ctx.getConfiguration());
+
this.ctx = ctx;
- if (ctx.getParallelism() > 0) {
- setParallelism(ctx.getParallelism());
+
+ final int parallelism = ctx.getParallelism();
+ if (parallelism > 0) {
+ setParallelism(parallelism);
}
+ setUserClassloader(ctx.getUserCodeClassLoader());
}
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
- JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
- streamGraph,
- ctx.getClient().getFlinkConfiguration(),
- getParallelism());
-
- jobGraph.addJars(ctx.getJars());
- jobGraph.setClasspaths(ctx.getClasspaths());
-
- // running from the CLI will override the savepoint restore settings
- jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
- JobExecutionResult jobExecutionResult;
- if (ctx.isDetached()) {
- jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
- } else {
- jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
- }
-
+ final JobExecutionResult jobExecutionResult = super.execute(streamGraph);
ctx.setJobExecutionResult(jobExecutionResult);
-
return jobExecutionResult;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b8544cf..ea50d04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -179,6 +179,10 @@ public class StreamExecutionEnvironment {
this.userClassloader = checkNotNull(userClassloader);
}
+ public ClassLoader getUserCodeClassLoader() {
+ return userClassloader;
+ }
+
protected Configuration getConfiguration() {
return this.configuration;
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 362deda..388dea0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
// we ignore the addressOption because it can only contain "yarn-cluster"
final Configuration effectiveConfiguration = new Configuration(configuration);
- effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
+ effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
+ } else {
+ effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
}
if (commandLine.hasOption(jmMemory.getOpt())) {