You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:20 UTC
[flink] 07/07: [FLINK-14745] Wire the configuration to the
ClientUtils.executeProgram
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 03c125c41f12c1f27fe61a320c5a04af200e4ec0
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 13 16:23:03 2019 +0100
[FLINK-14745] Wire the configuration to the ClientUtils.executeProgram
---
.../java/org/apache/flink/client/ClientUtils.java | 29 ++++++++++++++------
.../org/apache/flink/client/cli/CliFrontend.java | 12 +++-----
.../flink/client/cli/CliFrontendRunTest.java | 7 +++--
.../apache/flink/client/program/ClientTest.java | 32 ++++++++++++++++++----
4 files changed, 55 insertions(+), 25 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 043b740..02b73a9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -27,9 +28,11 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
@@ -133,13 +136,23 @@ public enum ClientUtils {
}
public static JobSubmissionResult executeProgram(
+ Configuration configuration,
ClusterClient<?> client,
- PackagedProgram program,
- int parallelism,
- boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
+ PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+
+ final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+ final List<URL> jobJars = executionConfigAccessor.getJars();
+ final List<URL> classpaths = executionConfigAccessor.getClasspaths();
+ final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
+ final int parallelism = executionConfigAccessor.getParallelism();
+ final boolean detached = executionConfigAccessor.getDetachedMode();
+
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ final ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jobJars, classpaths, contextClassLoader);
+
try {
- Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());
+ Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", detached);
@@ -147,12 +160,12 @@ public enum ClientUtils {
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
client,
- program.getJobJarAndDependencies(),
- program.getClasspaths(),
- program.getUserCodeClassLoader(),
+ jobJars,
+ classpaths,
+ userCodeClassLoader,
parallelism,
detached,
- program.getSavepointSettings(),
+ savepointSettings,
jobExecutionResult);
ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9ff6d53..5d24aee 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -278,11 +278,8 @@ public class CliFrontend {
try {
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
- userParallelism = defaultParallelism;
- }
- executeProgram(program, client, userParallelism, executionParameters.getDetachedMode());
+ executeProgram(executionConfig, program, client);
} finally {
if (clusterId == null && !executionParameters.getDetachedMode()) {
// terminate the cluster only if we have started it before and if it's not detached
@@ -742,13 +739,12 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected void executeProgram(
+ Configuration configuration,
PackagedProgram program,
- ClusterClient<?> client,
- int parallelism,
- boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
+ ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");
- JobSubmissionResult result = ClientUtils.executeProgram(client, program, parallelism, detached);
+ JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 449e1b2..a0d551b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -198,9 +198,10 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
}
@Override
- protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism, boolean detached) {
- assertEquals(isDetached, detached);
- assertEquals(expectedParallelism, parallelism);
+ protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+ final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+ assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
+ assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
}
}
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index a1cc8a2..5845080 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,8 +31,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -52,6 +56,7 @@ import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.net.URL;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
@@ -92,6 +97,15 @@ public class ClientTest extends TestLogger {
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
+ private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) {
+ final Configuration configuration = new Configuration();
+ configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ configuration.set(DeploymentOptions.ATTACHED, !detached);
+ ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
+ ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
+ return configuration;
+ }
+
/**
* Tests that invalid detached mode programs fail.
*/
@@ -100,7 +114,8 @@ public class ClientTest extends TestLogger {
final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
- ClientUtils.executeProgram(clusterClient, prg, 1, true);
+ final Configuration configuration = fromPackagedProgram(prg, 1, true);
+ ClientUtils.executeProgram(configuration, clusterClient, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -110,7 +125,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
- ClientUtils.executeProgram(clusterClient, prg, 1, true);
+ final Configuration configuration = fromPackagedProgram(prg, 1, true);
+ ClientUtils.executeProgram(configuration, clusterClient, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -120,7 +136,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
- ClientUtils.executeProgram(clusterClient, prg, 1, true);
+ final Configuration configuration = fromPackagedProgram(prg, 1, true);
+ ClientUtils.executeProgram(configuration, clusterClient, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -130,7 +147,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
- ClientUtils.executeProgram(clusterClient, prg, 1, true);
+ final Configuration configuration = fromPackagedProgram(prg, 1, true);
+ ClientUtils.executeProgram(configuration, clusterClient, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -140,7 +158,8 @@ public class ClientTest extends TestLogger {
try {
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
- ClientUtils.executeProgram(clusterClient, prg, 1, true);
+ final Configuration configuration = fromPackagedProgram(prg, 1, true);
+ ClientUtils.executeProgram(configuration, clusterClient, prg);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
@@ -184,7 +203,8 @@ public class ClientTest extends TestLogger {
try {
final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
- ClientUtils.executeProgram(client, packagedProgramMock, 1, true);
+ final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
+ ClientUtils.executeProgram(configuration, client, packagedProgramMock);
fail("Creating the local execution environment should not be possible");
}
catch (InvalidProgramException e) {