You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:20 UTC

[flink] 07/07: [FLINK-14745] Wire the configuration to the ClientUtils.executeProgram

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03c125c41f12c1f27fe61a320c5a04af200e4ec0
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 13 16:23:03 2019 +0100

    [FLINK-14745] Wire the configuration to the ClientUtils.executeProgram
---
 .../java/org/apache/flink/client/ClientUtils.java  | 29 ++++++++++++++------
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +++-----
 .../flink/client/cli/CliFrontendRunTest.java       |  7 +++--
 .../apache/flink/client/program/ClientTest.java    | 32 ++++++++++++++++++----
 4 files changed, 55 insertions(+), 25 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 043b740..02b73a9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -27,9 +28,11 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -133,13 +136,23 @@ public enum ClientUtils {
 	}
 
 	public static JobSubmissionResult executeProgram(
+			Configuration configuration,
 			ClusterClient<?> client,
-			PackagedProgram program,
-			int parallelism,
-			boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
+			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+		final List<URL> jobJars = executionConfigAccessor.getJars();
+		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
+		final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
+		final int parallelism = executionConfigAccessor.getParallelism();
+		final boolean detached = executionConfigAccessor.getDetachedMode();
+
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+		final ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jobJars, classpaths, contextClassLoader);
+
 		try {
-			Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());
+			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
 			LOG.info("Starting program (detached: {})", detached);
 
@@ -147,12 +160,12 @@ public enum ClientUtils {
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
 				client,
-				program.getJobJarAndDependencies(),
-				program.getClasspaths(),
-				program.getUserCodeClassLoader(),
+				jobJars,
+				classpaths,
+				userCodeClassLoader,
 				parallelism,
 				detached,
-				program.getSavepointSettings(),
+				savepointSettings,
 				jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9ff6d53..5d24aee 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -278,11 +278,8 @@ public class CliFrontend {
 				try {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
-					if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
-						userParallelism = defaultParallelism;
-					}
 
-					executeProgram(program, client, userParallelism, executionParameters.getDetachedMode());
+					executeProgram(executionConfig, program, client);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
@@ -742,13 +739,12 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected void executeProgram(
+			Configuration configuration,
 			PackagedProgram program,
-			ClusterClient<?> client,
-			int parallelism,
-			boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
+			ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(client, program, parallelism, detached);
+		JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 449e1b2..a0d551b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -198,9 +198,10 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism, boolean detached) {
-			assertEquals(isDetached, detached);
-			assertEquals(expectedParallelism, parallelism);
+		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
+			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
 		}
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index a1cc8a2..5845080 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,8 +31,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -52,6 +56,7 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -92,6 +97,15 @@ public class ClientTest extends TestLogger {
 		config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
 	}
 
+	private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) {
+		final Configuration configuration = new Configuration();
+		configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+		configuration.set(DeploymentOptions.ATTACHED, !detached);
+		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
+		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
+		return configuration;
+	}
+
 	/**
 	 * Tests that invalid detached mode programs fail.
 	 */
@@ -100,7 +114,8 @@ public class ClientTest extends TestLogger {
 		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
-			ClientUtils.executeProgram(clusterClient, prg, 1, true);
+			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+			ClientUtils.executeProgram(configuration, clusterClient, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -110,7 +125,8 @@ public class ClientTest extends TestLogger {
 
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
-			ClientUtils.executeProgram(clusterClient, prg, 1, true);
+			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+			ClientUtils.executeProgram(configuration, clusterClient, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -120,7 +136,8 @@ public class ClientTest extends TestLogger {
 
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
-			ClientUtils.executeProgram(clusterClient, prg, 1, true);
+			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+			ClientUtils.executeProgram(configuration, clusterClient, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -130,7 +147,8 @@ public class ClientTest extends TestLogger {
 
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
-			ClientUtils.executeProgram(clusterClient, prg, 1, true);
+			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+			ClientUtils.executeProgram(configuration, clusterClient, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -140,7 +158,8 @@ public class ClientTest extends TestLogger {
 
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
-			ClientUtils.executeProgram(clusterClient, prg, 1, true);
+			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+			ClientUtils.executeProgram(configuration, clusterClient, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -184,7 +203,8 @@ public class ClientTest extends TestLogger {
 
 		try {
 			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-			ClientUtils.executeProgram(client, packagedProgramMock, 1, true);
+			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
+			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
 			fail("Creating the local execution environment should not be possible");
 		}
 		catch (InvalidProgramException e) {