You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 21:00:39 UTC
[flink] 07/13: [hotfix] Simplify ContextEnvironment construction to
use configuration
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f960c8b76a16cc00208e05f1cb6235305dbc9d25
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 15:22:19 2019 +0100
[hotfix] Simplify ContextEnvironment construction to use configuration
---
.../java/org/apache/flink/client/ClientUtils.java | 25 ++-------
.../flink/client/program/ContextEnvironment.java | 41 ++++++++------
.../client/program/ContextEnvironmentFactory.java | 64 +++++++---------------
.../apache/flink/client/program/ClientTest.java | 5 ++
4 files changed, 57 insertions(+), 78 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..1654dff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -30,10 +29,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
@@ -150,33 +149,21 @@ public enum ClientUtils {
ClusterClient<?> client,
PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
- final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
- final List<URL> jobJars = executionConfigAccessor.getJars();
- final List<URL> classpaths = executionConfigAccessor.getClasspaths();
- final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
- final int parallelism = executionConfigAccessor.getParallelism();
- final boolean detached = executionConfigAccessor.getDetachedMode();
-
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
- LOG.info("Starting program (detached: {})", detached);
+ LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
- client,
- jobJars,
- classpaths,
- userCodeClassLoader,
- parallelism,
- detached,
- savepointSettings,
- jobExecutionResult);
+ configuration,
+ client,
+ userCodeClassLoader,
+ jobExecutionResult);
ContextEnvironment.setAsContext(factory);
try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9a03271 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -32,6 +34,8 @@ import java.net.URL;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Execution Environment for remote execution with the Client.
*/
@@ -54,23 +58,28 @@ public class ContextEnvironment extends ExecutionEnvironment {
private boolean alreadyCalled;
public ContextEnvironment(
- ClusterClient<?> remoteConnection,
- List<URL> jarFiles,
- List<URL> classpaths,
- ClassLoader userCodeClassLoader,
- SavepointRestoreSettings savepointSettings,
- boolean detached,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = remoteConnection;
- this.jarFilesToAttach = jarFiles;
- this.classpathsToAttach = classpaths;
- this.userCodeClassLoader = userCodeClassLoader;
- this.savepointSettings = savepointSettings;
-
- this.detached = detached;
- this.alreadyCalled = false;
+ final Configuration configuration,
+ final ClusterClient<?> remoteConnection,
+ final ClassLoader userCodeClassLoader,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+ this.client = checkNotNull(remoteConnection);
+ this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
+ final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+ .fromConfiguration(checkNotNull(configuration));
- this.jobExecutionResult = jobExecutionResult;
+ if (accessor.getParallelism() > 0) {
+ setParallelism(accessor.getParallelism());
+ }
+
+ this.jarFilesToAttach = accessor.getJars();
+ this.classpathsToAttach = accessor.getClasspaths();
+ this.savepointSettings = accessor.getSavepointRestoreSettings();
+ this.detached = accessor.getDetachedMode();
+
+ this.alreadyCalled = false;
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..ab589f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
-import java.net.URL;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
- private final ClusterClient<?> client;
-
- private final List<URL> jarFilesToAttach;
+ private final Configuration configuration;
- private final List<URL> classpathsToAttach;
+ private final ClusterClient<?> client;
private final ClassLoader userCodeClassLoader;
- private final int defaultParallelism;
-
- private final boolean isDetached;
-
- private final SavepointRestoreSettings savepointSettings;
-
private final AtomicReference<JobExecutionResult> jobExecutionResult;
private boolean alreadyCalled;
public ContextEnvironmentFactory(
- ClusterClient<?> client,
- List<URL> jarFilesToAttach,
- List<URL> classpathsToAttach,
- ClassLoader userCodeClassLoader,
- int defaultParallelism,
- boolean isDetached,
- SavepointRestoreSettings savepointSettings,
- AtomicReference<JobExecutionResult> jobExecutionResult) {
- this.client = client;
- this.jarFilesToAttach = jarFilesToAttach;
- this.classpathsToAttach = classpathsToAttach;
- this.userCodeClassLoader = userCodeClassLoader;
- this.defaultParallelism = defaultParallelism;
- this.isDetached = isDetached;
- this.savepointSettings = savepointSettings;
+ final Configuration configuration,
+ final ClusterClient<?> client,
+ final ClassLoader userCodeClassLoader,
+ final AtomicReference<JobExecutionResult> jobExecutionResult) {
+ this.configuration = checkNotNull(configuration);
+ this.client = checkNotNull(client);
+ this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+ this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
this.alreadyCalled = false;
- this.jobExecutionResult = jobExecutionResult;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
verifyCreateIsCalledOnceWhenInDetachedMode();
-
- final ContextEnvironment environment = new ContextEnvironment(
- client,
- jarFilesToAttach,
- classpathsToAttach,
- userCodeClassLoader,
- savepointSettings,
- isDetached,
- jobExecutionResult);
- if (defaultParallelism > 0) {
- environment.setParallelism(defaultParallelism);
- }
- return environment;
+ return new ContextEnvironment(
+ configuration,
+ client,
+ userCodeClassLoader,
+ jobExecutionResult);
}
private void verifyCreateIsCalledOnceWhenInDetachedMode() {
- if (isDetached && alreadyCalled) {
+ if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
}
alreadyCalled = true;
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
@Test
public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+ when(packagedProgramMock.getUserCodeClassLoader())
+ .thenReturn(packagedProgramMock.getClass().getClassLoader());
+
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {