You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/08/24 17:45:07 UTC
[flink] branch release-1.11 updated: Revert "[FLINK-18742][cli]
Respect all config args when creating packaged program at client"
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new d9af11d3 Revert "[FLINK-18742][cli] Respect all config args when creating packaged program at client"
d9af11d3 is described below
commit d9af11d372cc76efefed50f544f67c45223b8bb5
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon Aug 24 19:25:33 2020 +0200
Revert "[FLINK-18742][cli] Respect all config args when creating packaged program at client"
This reverts commit dad18a52a1a960efc7be3139abbbf9cae5b6d99f.
---
.../org/apache/flink/client/cli/CliFrontend.java | 47 ++-----
.../flink/client/program/PackagedProgram.java | 32 +----
.../cli/CliFrontendDynamicPropertiesTest.java | 152 ---------------------
.../librarycache/FlinkUserCodeClassLoaders.java | 2 +-
4 files changed, 11 insertions(+), 222 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c1fc7aa..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -219,15 +219,15 @@ public class CliFrontend {
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
- final List<URL> jobJars = getJobJarAndDependencies(programOptions);
+ final PackagedProgram program =
+ getPackagedProgram(programOptions);
+ final List<URL> jobJars = program.getJobJarAndDependencies();
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
- final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
-
try {
executeProgram(effectiveConfiguration, program);
} finally {
@@ -235,28 +235,11 @@ public class CliFrontend {
}
}
- /**
- * Get all provided libraries needed to run the program from the ProgramOptions.
- */
- private List<URL> getJobJarAndDependencies(ProgramOptions programOptions) throws CliArgsException {
- String entryPointClass = programOptions.getEntryPointClassName();
- String jarFilePath = programOptions.getJarFilePath();
-
- try {
- File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
- return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
- } catch (FileNotFoundException | ProgramInvocationException e) {
- throw new CliArgsException("Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
- }
- }
-
- private PackagedProgram getPackagedProgram(
- ProgramOptions programOptions,
- Configuration effectiveConfiguration) throws ProgramInvocationException, CliArgsException {
+ private PackagedProgram getPackagedProgram(ProgramOptions programOptions) throws ProgramInvocationException, CliArgsException {
PackagedProgram program;
try {
LOG.info("Building program from JAR file");
- program = buildProgram(programOptions, effectiveConfiguration);
+ program = buildProgram(programOptions);
} catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file: " + e.getMessage(), e);
}
@@ -306,8 +289,7 @@ public class CliFrontend {
// -------- build the packaged program -------------
LOG.info("Building program from JAR file");
-
- PackagedProgram program = null;
+ final PackagedProgram program = buildProgram(programOptions);
try {
int parallelism = programOptions.getParallelism();
@@ -321,9 +303,7 @@ public class CliFrontend {
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final Configuration effectiveConfiguration = getEffectiveConfiguration(
- activeCommandLine, commandLine, programOptions, getJobJarAndDependencies(programOptions));
-
- program = buildProgram(programOptions, effectiveConfiguration);
+ activeCommandLine, commandLine, programOptions, program.getJobJarAndDependencies());
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
@@ -348,9 +328,7 @@ public class CliFrontend {
}
}
finally {
- if (program != null) {
- program.deleteExtractedLibraries();
- }
+ program.deleteExtractedLibraries();
}
}
@@ -728,15 +706,6 @@ public class CliFrontend {
*/
PackagedProgram buildProgram(final ProgramOptions runOptions)
throws FileNotFoundException, ProgramInvocationException, CliArgsException {
- return buildProgram(runOptions, configuration);
- }
-
- /**
- * Creates a Packaged program from the given command line options and the effectiveConfiguration.
- *
- * @return A PackagedProgram (upon success)
- */
- PackagedProgram buildProgram(final ProgramOptions runOptions, final Configuration configuration) throws FileNotFoundException, ProgramInvocationException, CliArgsException {
runOptions.validate();
String[] programArgs = runOptions.getProgramArgs();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 0a33696..2fea7e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -220,12 +220,12 @@ public class PackagedProgram {
* Returns all provided libraries needed to run the program.
*/
public List<URL> getJobJarAndDependencies() {
- List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
+ List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
if (jarFile != null) {
libs.add(jarFile);
}
- for (File tmpLib : extractedTempLibraries) {
+ for (File tmpLib : this.extractedTempLibraries) {
try {
libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
@@ -241,34 +241,6 @@ public class PackagedProgram {
}
/**
- * Returns all provided libraries needed to run the program.
- */
- public static List<URL> getJobJarAndDependencies(File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
- URL jarFileUrl = loadJarFile(jarFile);
-
- List<File> extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
-
- List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
-
- if (jarFileUrl != null) {
- libs.add(jarFileUrl);
- }
- for (File tmpLib : extractedTempLibraries) {
- try {
- libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
- } catch (MalformedURLException e) {
- throw new RuntimeException("URL is invalid. This should not happen.", e);
- }
- }
-
- if (isPython(entryPointClassName)) {
- libs.add(PackagedProgramUtils.getPythonJar());
- }
-
- return libs;
- }
-
- /**
* Deletes all temporary files created for contained packaged libraries.
*/
public void deleteExtractedLibraries() {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
deleted file mode 100644
index b21db47..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
-import org.apache.flink.util.ChildFirstClassLoader;
-
-import org.apache.commons.cli.Options;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.Collections;
-
-import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
-import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * Tests for the RUN command with Dynamic Properties.
- */
-public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
-
- private GenericCLI cliUnderTest;
- private Configuration configuration;
-
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
-
- @BeforeClass
- public static void init() {
- CliFrontendTestUtils.pipeSystemOutToNull();
- }
-
- @AfterClass
- public static void shutdown() {
- CliFrontendTestUtils.restoreSystemOut();
- }
-
- @Before
- public void setup() {
- Options testOptions = new Options();
- configuration = new Configuration();
- configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
-
- cliUnderTest = new GenericCLI(
- configuration,
- tmp.getRoot().getAbsolutePath());
-
- cliUnderTest.addGeneralOptions(testOptions);
- }
-
- @Test
- public void testDynamicPropertiesWithParentFirstClassloader() throws Exception {
-
- String[] args = {
- "-e", "test-executor",
- "-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
- "-D" + "classloader.resolve-order=parent-first",
- getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
- };
-
- verifyCliFrontend(configuration, args, cliUnderTest, "parent-first", ParentFirstClassLoader.class.getName());
- }
-
- @Test
- public void testDynamicPropertiesWithDefaultChildFirstClassloader() throws Exception {
-
- String[] args = {
- "-e", "test-executor",
- "-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
- getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
- };
-
- verifyCliFrontend(configuration, args, cliUnderTest, "child-first", ChildFirstClassLoader.class.getName());
- }
-
- @Test
- public void testDynamicPropertiesWithChildFirstClassloader() throws Exception {
-
- String[] args = {
- "-e", "test-executor",
- "-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
- "-D" + "classloader.resolve-order=child-first",
- getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
- };
-
- verifyCliFrontend(configuration, args, cliUnderTest, "child-first", ChildFirstClassLoader.class.getName());
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static void verifyCliFrontend(
- Configuration configuration,
- String[] parameters,
- GenericCLI cliUnderTest,
- String expectedResolveOrderOption,
- String userCodeClassLoaderClassName) throws Exception {
- TestingCliFrontend testFrontend =
- new TestingCliFrontend(configuration, cliUnderTest, expectedResolveOrderOption, userCodeClassLoaderClassName);
- testFrontend.run(parameters); // verifies the expected values (see below)
- }
-
- private static final class TestingCliFrontend extends CliFrontend {
-
- private final String expectedResolveOrder;
-
- private final String userCodeClassLoaderClassName;
-
- private TestingCliFrontend(
- Configuration configuration,
- GenericCLI cliUnderTest,
- String expectedResolveOrderOption,
- String userCodeClassLoaderClassName) {
- super(
- configuration,
- Collections.singletonList(cliUnderTest));
- this.expectedResolveOrder = expectedResolveOrderOption;
- this.userCodeClassLoaderClassName = userCodeClassLoaderClassName;
- }
-
- @Override
- protected void executeProgram(Configuration configuration, PackagedProgram program) {
- assertEquals(TEST_JAR_MAIN_CLASS, program.getMainClassName());
- assertEquals(expectedResolveOrder, configuration.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER));
- assertEquals(userCodeClassLoaderClassName, program.getUserCodeClassLoader().getClass().getName());
- }
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
index 66efd82..7f1c163 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -86,7 +86,7 @@ public class FlinkUserCodeClassLoaders {
/**
* Regular URLClassLoader that first loads from the parent and only after that from the URLs.
*/
- public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
+ static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
ParentFirstClassLoader(URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {
super(urls, parent, classLoadingExceptionHandler);