You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/14 09:48:17 UTC
[flink] 04/07: [hotfix] CliFrontend.buildProgram() uses only
ProgramOptions
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cffa2e8647f74e0332cad011d937015ac9ac031e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 12 09:12:37 2019 +0100
[hotfix] CliFrontend.buildProgram() uses only ProgramOptions
---
.../org/apache/flink/client/cli/CliFrontend.java | 21 +++++----
.../flink/client/cli/ExecutionConfigAccessor.java | 26 -----------
.../client/cli/CliFrontendPackageProgramTest.java | 50 +++++++++-------------
3 files changed, 31 insertions(+), 66 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9552978..6e9b2f9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -185,7 +185,6 @@ public class CliFrontend {
final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
final ProgramOptions programOptions = new ProgramOptions(commandLine);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
@@ -195,7 +194,7 @@ public class CliFrontend {
if (!programOptions.isPython()) {
// Java program should be specified a JAR file
- if (executionParameters.getJarFilePath() == null) {
+ if (programOptions.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}
@@ -203,7 +202,7 @@ public class CliFrontend {
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
- program = buildProgram(programOptions, executionParameters);
+ program = buildProgram(programOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
@@ -211,7 +210,10 @@ public class CliFrontend {
final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+
+ final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
final Configuration executionConfig = executionParameters.getConfiguration();
+
try {
runProgram(executorConfig, executionConfig, program);
} finally {
@@ -322,7 +324,6 @@ public class CliFrontend {
final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true);
final ProgramOptions programOptions = new ProgramOptions(commandLine);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
@@ -337,7 +338,7 @@ public class CliFrontend {
// -------- build the packaged program -------------
LOG.info("Building program from JAR file");
- final PackagedProgram program = buildProgram(programOptions, executionParameters);
+ final PackagedProgram program = buildProgram(programOptions);
try {
int parallelism = programOptions.getParallelism();
@@ -768,12 +769,10 @@ public class CliFrontend {
*
* @return A PackagedProgram (upon success)
*/
- PackagedProgram buildProgram(
- final ProgramOptions runOptions,
- final ExecutionConfigAccessor executionParameters) throws FileNotFoundException, ProgramInvocationException {
+ PackagedProgram buildProgram(final ProgramOptions runOptions) throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = runOptions.getProgramArgs();
- String jarFilePath = executionParameters.getJarFilePath();
- List<URL> classpaths = executionParameters.getClasspaths();
+ String jarFilePath = runOptions.getJarFilePath();
+ List<URL> classpaths = runOptions.getClasspaths();
// Get assembler class
String entryPointClass = runOptions.getEntryPointClassName();
@@ -803,7 +802,7 @@ public class CliFrontend {
.setArguments(programArgs)
.build();
- program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
+ program.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings());
return program;
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 77c0440..ee32449 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -27,10 +27,8 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.Collections;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,39 +65,15 @@ public class ExecutionConfigAccessor {
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
- parseJarURLToConfig(options.getJarFilePath(), configuration);
-
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);
return new ExecutionConfigAccessor(configuration);
}
- private static void parseJarURLToConfig(final String jarFile, final Configuration configuration) {
- if (jarFile == null) {
- return;
- }
-
- try {
- final URL jarUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
- final List<URL> jarUrlSingleton = Collections.singletonList(jarUrl);
- ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jarUrlSingleton, URL::toString);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("JAR file path invalid", e);
- }
- }
-
public Configuration getConfiguration() {
return configuration;
}
- public String getJarFilePath() {
- final List<URL> jarURL = decodeUrlList(configuration, PipelineOptions.JARS);
- if (jarURL != null && !jarURL.isEmpty()) {
- return jarURL.get(0).getPath();
- }
- return null;
- }
-
public List<URL> getClasspaths() {
return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
index 2b64c82..75771cc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
@@ -80,11 +80,10 @@ public class CliFrontendPackageProgramTest extends TestLogger {
@Test
public void testNonExistingJarFile() throws Exception {
ProgramOptions programOptions = mock(ProgramOptions.class);
- ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class);
- when(executionOptions.getJarFilePath()).thenReturn("/some/none/existing/path");
+ when(programOptions.getJarFilePath()).thenReturn("/some/none/existing/path");
try {
- frontend.buildProgram(programOptions, executionOptions);
+ frontend.buildProgram(programOptions);
fail("should throw an exception");
}
catch (FileNotFoundException e) {
@@ -95,12 +94,11 @@ public class CliFrontendPackageProgramTest extends TestLogger {
@Test
public void testFileNotJarFile() throws Exception {
ProgramOptions programOptions = mock(ProgramOptions.class);
- ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class);
- when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath());
+ when(programOptions.getJarFilePath()).thenReturn(getNonJarFilePath());
when(programOptions.getProgramArgs()).thenReturn(new String[0]);
try {
- frontend.buildProgram(programOptions, executionOptions);
+ frontend.buildProgram(programOptions);
fail("should throw an exception");
}
catch (ProgramInvocationException e) {
@@ -120,13 +118,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
- assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+ assertEquals(getTestJarPath(), programOptions.getJarFilePath());
+ assertArrayEquals(classpath, programOptions.getClasspaths().toArray());
assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
- PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
+ PackagedProgram prog = frontend.buildProgram(programOptions);
Assert.assertArrayEquals(reducedArguments, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -144,13 +141,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
- assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+ assertEquals(getTestJarPath(), programOptions.getJarFilePath());
+ assertArrayEquals(classpath, programOptions.getClasspaths().toArray());
assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
- PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
+ PackagedProgram prog = frontend.buildProgram(programOptions);
Assert.assertArrayEquals(reducedArguments, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -168,13 +164,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
- assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+ assertEquals(getTestJarPath(), programOptions.getJarFilePath());
+ assertArrayEquals(classpath, programOptions.getClasspaths().toArray());
assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
- PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
+ PackagedProgram prog = frontend.buildProgram(programOptions);
Assert.assertArrayEquals(reducedArguments, prog.getArguments());
Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -197,14 +192,13 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(arguments[4], executionOptions.getJarFilePath());
- assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+ assertEquals(arguments[4], programOptions.getJarFilePath());
+ assertArrayEquals(classpath, programOptions.getClasspaths().toArray());
assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
try {
- frontend.buildProgram(programOptions, executionOptions);
+ frontend.buildProgram(programOptions);
fail("Should fail with an exception");
}
catch (FileNotFoundException e) {
@@ -218,13 +212,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(arguments[0], executionOptions.getJarFilePath());
+ assertEquals(arguments[0], programOptions.getJarFilePath());
assertArrayEquals(new String[0], programOptions.getProgramArgs());
try {
- frontend.buildProgram(programOptions, executionOptions);
+ frontend.buildProgram(programOptions);
}
catch (FileNotFoundException e) {
// that's what we want
@@ -279,14 +272,13 @@ public class CliFrontendPackageProgramTest extends TestLogger {
CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
ProgramOptions programOptions = new ProgramOptions(commandLine);
- ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
- assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
- assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+ assertEquals(getTestJarPath(), programOptions.getJarFilePath());
+ assertArrayEquals(classpath, programOptions.getClasspaths().toArray());
assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, programOptions.getEntryPointClassName());
assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
- PackagedProgram prog = spy(frontend.buildProgram(programOptions, executionOptions));
+ PackagedProgram prog = spy(frontend.buildProgram(programOptions));
ClassLoader testClassLoader = new ClassLoader(prog.getUserCodeClassLoader()) {
@Override