You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/12 07:59:02 UTC
[flink] 22/24: possible fix
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7ef9ed387334ca2bc9034d4fba82406e60a9fa9
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 11 14:58:45 2019 +0100
possible fix
---
.../java/org/apache/flink/client/ClientUtils.java | 5 -
.../org/apache/flink/client/cli/CliFrontend.java | 24 +++--
.../flink/client/cli/ExecutionConfigAccessor.java | 4 +-
.../org/apache/flink/api/java/FlinkILoopTest.java | 111 ++-------------------
4 files changed, 21 insertions(+), 123 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 96a02ea..b5537e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -29,10 +29,8 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
-import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -185,9 +183,6 @@ public enum ClientUtils {
final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
- // TODO: 11.11.19 this should move to the appropriate place
-// ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
-
final ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
executorServiceLoader,
configuration,
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index f53095b..780eadd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -35,10 +35,12 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
@@ -176,7 +178,6 @@ public class CliFrontend {
final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
final ProgramOptions programOptions = new ProgramOptions(commandLine);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
@@ -186,11 +187,12 @@ public class CliFrontend {
if (!programOptions.isPython()) {
// Java program should be specified a JAR file
- if (executionParameters.getJarFilePaths().isEmpty()) {
+ if (programOptions.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}
+ final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
@@ -201,8 +203,7 @@ public class CliFrontend {
}
final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
-
- final Configuration effectiveConfig = getEffectiveConfiguration(commandLine, executionParameters, customCommandLine);
+ final Configuration effectiveConfig = getEffectiveConfiguration(program, customCommandLine, commandLine, executionParameters);
try {
execute(effectiveConfig, program);
} finally {
@@ -214,13 +215,14 @@ public class CliFrontend {
ClientUtils.runProgram(clusterClientServiceLoader, configuration, program);
}
- private Configuration getEffectiveConfiguration(CommandLine commandLine, ExecutionConfigAccessor executionParameters, CustomCommandLine customCommandLine) throws FlinkException {
- // TODO: 01.11.19 all this should be merged nicely, e.g. executionParameters can
- // take an already existing configuration as a parameter.
- final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
- final Configuration executionConfig = executionParameters.getConfiguration();
- final Configuration effectiveConfig = new Configuration(executorConfig);
- effectiveConfig.addAll(executionConfig);
+ private Configuration getEffectiveConfiguration(
+ final PackagedProgram program,
+ final CustomCommandLine customCommandLine,
+ final CommandLine commandLine,
+ final ExecutionConfigAccessor executionParameters) throws FlinkException {
+ final Configuration effectiveConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+ executionParameters.applyToConfiguration(effectiveConfig);
+ ConfigUtils.encodeStreamToConfig(effectiveConfig, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
return effectiveConfig;
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index e82dd36..05c3946 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -95,8 +95,8 @@ public class ExecutionConfigAccessor {
}
}
- public Configuration getConfiguration() {
- return configuration;
+ void applyToConfiguration(final Configuration effectiveConfig) {
+ effectiveConfig.addAll(configuration);
}
public List<URL> getJarFilePaths() {
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 19bc2a0..b86d5f4 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -18,10 +18,6 @@
package org.apache.flink.api.java;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
@@ -29,22 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.BDDMockito;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.net.URL;
-import java.util.List;
import scala.Option;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.settings.MutableSettings;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -52,49 +34,24 @@ import static org.junit.Assert.assertTrue;
/**
* Integration tests for {@link FlinkILoop}.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(PlanExecutor.class)
-@PowerMockIgnore("javax.tools.*")
public class FlinkILoopTest extends TestLogger {
@Test
- public void testConfigurationForwarding() throws Exception {
+ public void testConfigurationForwarding() {
Configuration configuration = new Configuration();
configuration.setString("foobar", "foobar");
- FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
- final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
-
- PowerMockito.mockStatic(PlanExecutor.class);
- BDDMockito.given(PlanExecutor.createRemoteExecutor(
- Matchers.anyString(),
- Matchers.anyInt(),
- Matchers.any(Configuration.class)
- )).willAnswer(new Answer<PlanExecutor>() {
- @Override
- public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
- testPlanExecutor.setHost((String) invocation.getArguments()[0]);
- testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
- testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
- return testPlanExecutor;
- }
- });
-
- Settings settings = new Settings();
- ((MutableSettings.BooleanSetting) settings.usejavacp()).value_$eq(true);
-
- flinkILoop.settings_$eq(settings);
- flinkILoop.createInterpreter();
+ FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv();
- env.fromElements(1).output(new DiscardingOutputFormat<Integer>());
+ assertTrue(env instanceof RemoteEnvironment);
- env.execute("Test job");
+ RemoteEnvironment remoteEnv = (RemoteEnvironment) env;
- Configuration forwardedConfiguration = testPlanExecutor.getConfiguration();
+ Configuration configUnderTest = remoteEnv.getExecutorConfiguration();
- assertEquals(configuration, forwardedConfiguration);
+ assertEquals(configuration, configUnderTest);
}
@Test
@@ -114,60 +71,4 @@ public class FlinkILoopTest extends TestLogger {
assertEquals(configuration, forwardedConfiguration);
}
-
- static class TestPlanExecutor extends PlanExecutor {
-
- private String host;
- private int port;
- private Configuration configuration;
- private List<String> jars;
- private List<String> globalClasspaths;
-
- @Override
- public JobExecutionResult executePlan(
- Pipeline plan, List<URL> jarFiles, List<URL> globalClasspaths) throws Exception {
- return null;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(Configuration configuration) {
- this.configuration = configuration;
- }
-
- public List<String> getJars() {
- return jars;
- }
-
- public void setJars(List<String> jars) {
- this.jars = jars;
- }
-
- public List<String> getGlobalClasspaths() {
- return globalClasspaths;
- }
-
- public void setGlobalClasspaths(List<String> globalClasspaths) {
- this.globalClasspaths = globalClasspaths;
- }
- }
-
}