You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 21:00:39 UTC

[flink] 07/13: [hotfix] Simplify ContextEnvironment construction to use configuration

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f960c8b76a16cc00208e05f1cb6235305dbc9d25
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 15:22:19 2019 +0100

    [hotfix] Simplify ContextEnvironment construction to use configuration
---
 .../java/org/apache/flink/client/ClientUtils.java  | 25 ++-------
 .../flink/client/program/ContextEnvironment.java   | 41 ++++++++------
 .../client/program/ContextEnvironmentFactory.java  | 64 +++++++---------------
 .../apache/flink/client/program/ClientTest.java    |  5 ++
 4 files changed, 57 insertions(+), 78 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..1654dff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -30,10 +29,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -150,33 +149,21 @@ public enum ClientUtils {
 			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> jobJars = executionConfigAccessor.getJars();
-		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-		final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
-		final int parallelism = executionConfigAccessor.getParallelism();
-		final boolean detached = executionConfigAccessor.getDetachedMode();
-
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
-			LOG.info("Starting program (detached: {})", detached);
+			LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
 
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
-				client,
-				jobJars,
-				classpaths,
-				userCodeClassLoader,
-				parallelism,
-				detached,
-				savepointSettings,
-				jobExecutionResult);
+					configuration,
+					client,
+					userCodeClassLoader,
+					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
 
 			try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9a03271 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
@@ -32,6 +34,8 @@ import java.net.URL;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -54,23 +58,28 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private boolean alreadyCalled;
 
 	public ContextEnvironment(
-		ClusterClient<?> remoteConnection,
-		List<URL> jarFiles,
-		List<URL> classpaths,
-		ClassLoader userCodeClassLoader,
-		SavepointRestoreSettings savepointSettings,
-		boolean detached,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = remoteConnection;
-		this.jarFilesToAttach = jarFiles;
-		this.classpathsToAttach = classpaths;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointSettings = savepointSettings;
-
-		this.detached = detached;
-		this.alreadyCalled = false;
+			final Configuration configuration,
+			final ClusterClient<?> remoteConnection,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		this.client = checkNotNull(remoteConnection);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
+		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+				.fromConfiguration(checkNotNull(configuration));
 
-		this.jobExecutionResult = jobExecutionResult;
+		if (accessor.getParallelism() > 0) {
+			setParallelism(accessor.getParallelism());
+		}
+
+		this.jarFilesToAttach = accessor.getJars();
+		this.classpathsToAttach = accessor.getClasspaths();
+		this.savepointSettings = accessor.getSavepointRestoreSettings();
+		this.detached = accessor.getDetachedMode();
+
+		this.alreadyCalled = false;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..ab589f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs that are
  * submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient<?> client;
-
-	private final List<URL> jarFilesToAttach;
+	private final Configuration configuration;
 
-	private final List<URL> classpathsToAttach;
+	private final ClusterClient<?> client;
 
 	private final ClassLoader userCodeClassLoader;
 
-	private final int defaultParallelism;
-
-	private final boolean isDetached;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
-		ClusterClient<?> client,
-		List<URL> jarFilesToAttach,
-		List<URL> classpathsToAttach,
-		ClassLoader userCodeClassLoader,
-		int defaultParallelism,
-		boolean isDetached,
-		SavepointRestoreSettings savepointSettings,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = client;
-		this.jarFilesToAttach = jarFilesToAttach;
-		this.classpathsToAttach = classpathsToAttach;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.defaultParallelism = defaultParallelism;
-		this.isDetached = isDetached;
-		this.savepointSettings = savepointSettings;
+			final Configuration configuration,
+			final ClusterClient<?> client,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		this.configuration = checkNotNull(configuration);
+		this.client = checkNotNull(client);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
 		this.alreadyCalled = false;
-		this.jobExecutionResult = jobExecutionResult;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
-
-		final ContextEnvironment environment = new ContextEnvironment(
-			client,
-			jarFilesToAttach,
-			classpathsToAttach,
-			userCodeClassLoader,
-			savepointSettings,
-			isDetached,
-			jobExecutionResult);
-		if (defaultParallelism > 0) {
-			environment.setParallelism(defaultParallelism);
-		}
-		return environment;
+		return new ContextEnvironment(
+				configuration,
+				client,
+				userCodeClassLoader,
+				jobExecutionResult);
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-		if (isDetached && alreadyCalled) {
+		if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
 			throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
 		}
 		alreadyCalled = true;
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
 		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+		when(packagedProgramMock.getUserCodeClassLoader())
+				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {