You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/08/24 17:45:07 UTC

[flink] branch release-1.11 updated: Revert "[FLINK-18742][cli] Respect all config args when creating packaged program at client"

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d9af11d3 Revert "[FLINK-18742][cli] Respect all config args when creating packaged program at client"
d9af11d3 is described below

commit d9af11d372cc76efefed50f544f67c45223b8bb5
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon Aug 24 19:25:33 2020 +0200

    Revert "[FLINK-18742][cli] Respect all config args when creating packaged program at client"
    
    This reverts commit dad18a52a1a960efc7be3139abbbf9cae5b6d99f.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  47 ++-----
 .../flink/client/program/PackagedProgram.java      |  32 +----
 .../cli/CliFrontendDynamicPropertiesTest.java      | 152 ---------------------
 .../librarycache/FlinkUserCodeClassLoaders.java    |   2 +-
 4 files changed, 11 insertions(+), 222 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c1fc7aa..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -219,15 +219,15 @@ public class CliFrontend {
 
 		final ProgramOptions programOptions = ProgramOptions.create(commandLine);
 
-		final List<URL> jobJars = getJobJarAndDependencies(programOptions);
+		final PackagedProgram program =
+				getPackagedProgram(programOptions);
 
+		final List<URL> jobJars = program.getJobJarAndDependencies();
 		final Configuration effectiveConfiguration = getEffectiveConfiguration(
 				activeCommandLine, commandLine, programOptions, jobJars);
 
 		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
 
-		final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
-
 		try {
 			executeProgram(effectiveConfiguration, program);
 		} finally {
@@ -235,28 +235,11 @@ public class CliFrontend {
 		}
 	}
 
-	/**
-	 * Get all provided libraries needed to run the program from the ProgramOptions.
-	 */
-	private List<URL> getJobJarAndDependencies(ProgramOptions programOptions) throws CliArgsException {
-		String entryPointClass = programOptions.getEntryPointClassName();
-		String jarFilePath = programOptions.getJarFilePath();
-
-		try {
-			File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
-			return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
-		} catch (FileNotFoundException | ProgramInvocationException e) {
-			throw new CliArgsException("Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
-		}
-	}
-
-	private PackagedProgram getPackagedProgram(
-			ProgramOptions programOptions,
-			Configuration effectiveConfiguration) throws ProgramInvocationException, CliArgsException {
+	private PackagedProgram getPackagedProgram(ProgramOptions programOptions) throws ProgramInvocationException, CliArgsException {
 		PackagedProgram program;
 		try {
 			LOG.info("Building program from JAR file");
-			program = buildProgram(programOptions, effectiveConfiguration);
+			program = buildProgram(programOptions);
 		} catch (FileNotFoundException e) {
 			throw new CliArgsException("Could not build the program from JAR file: " + e.getMessage(), e);
 		}
@@ -306,8 +289,7 @@ public class CliFrontend {
 		// -------- build the packaged program -------------
 
 		LOG.info("Building program from JAR file");
-
-		PackagedProgram program = null;
+		final PackagedProgram program = buildProgram(programOptions);
 
 		try {
 			int parallelism = programOptions.getParallelism();
@@ -321,9 +303,7 @@ public class CliFrontend {
 					validateAndGetActiveCommandLine(checkNotNull(commandLine));
 
 			final Configuration effectiveConfiguration = getEffectiveConfiguration(
-					activeCommandLine, commandLine, programOptions, getJobJarAndDependencies(programOptions));
-
-			program = buildProgram(programOptions, effectiveConfiguration);
+					activeCommandLine, commandLine, programOptions, program.getJobJarAndDependencies());
 
 			Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
 			String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
@@ -348,9 +328,7 @@ public class CliFrontend {
 			}
 		}
 		finally {
-			if (program != null) {
-				program.deleteExtractedLibraries();
-			}
+			program.deleteExtractedLibraries();
 		}
 	}
 
@@ -728,15 +706,6 @@ public class CliFrontend {
 	 */
 	PackagedProgram buildProgram(final ProgramOptions runOptions)
 			throws FileNotFoundException, ProgramInvocationException, CliArgsException {
-		return buildProgram(runOptions, configuration);
-	}
-
-	/**
-	 * Creates a Packaged program from the given command line options and the effectiveConfiguration.
-	 *
-	 * @return A PackagedProgram (upon success)
-	 */
-	PackagedProgram buildProgram(final ProgramOptions runOptions, final Configuration configuration) throws FileNotFoundException, ProgramInvocationException, CliArgsException {
 		runOptions.validate();
 
 		String[] programArgs = runOptions.getProgramArgs();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 0a33696..2fea7e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -220,12 +220,12 @@ public class PackagedProgram {
 	 * Returns all provided libraries needed to run the program.
 	 */
 	public List<URL> getJobJarAndDependencies() {
-		List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
+		List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 
 		if (jarFile != null) {
 			libs.add(jarFile);
 		}
-		for (File tmpLib : extractedTempLibraries) {
+		for (File tmpLib : this.extractedTempLibraries) {
 			try {
 				libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
 			} catch (MalformedURLException e) {
@@ -241,34 +241,6 @@ public class PackagedProgram {
 	}
 
 	/**
-	 * Returns all provided libraries needed to run the program.
-	 */
-	public static List<URL> getJobJarAndDependencies(File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
-		URL jarFileUrl = loadJarFile(jarFile);
-
-		List<File> extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
-
-		List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
-
-		if (jarFileUrl != null) {
-			libs.add(jarFileUrl);
-		}
-		for (File tmpLib : extractedTempLibraries) {
-			try {
-				libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
-			} catch (MalformedURLException e) {
-				throw new RuntimeException("URL is invalid. This should not happen.", e);
-			}
-		}
-
-		if (isPython(entryPointClassName)) {
-			libs.add(PackagedProgramUtils.getPythonJar());
-		}
-
-		return libs;
-	}
-
-	/**
 	 * Deletes all temporary files created for contained packaged libraries.
 	 */
 	public void deleteExtractedLibraries() {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
deleted file mode 100644
index b21db47..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
-import org.apache.flink.util.ChildFirstClassLoader;
-
-import org.apache.commons.cli.Options;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.Collections;
-
-import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
-import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * Tests for the RUN command with Dynamic Properties.
- */
-public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
-
-	private GenericCLI cliUnderTest;
-	private Configuration configuration;
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	@BeforeClass
-	public static void init() {
-		CliFrontendTestUtils.pipeSystemOutToNull();
-	}
-
-	@AfterClass
-	public static void shutdown() {
-		CliFrontendTestUtils.restoreSystemOut();
-	}
-
-	@Before
-	public void setup() {
-		Options testOptions = new Options();
-		configuration = new Configuration();
-		configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
-
-		cliUnderTest = new GenericCLI(
-			configuration,
-			tmp.getRoot().getAbsolutePath());
-
-		cliUnderTest.addGeneralOptions(testOptions);
-	}
-
-	@Test
-	public void testDynamicPropertiesWithParentFirstClassloader() throws Exception {
-
-		String[] args = {
-			"-e", "test-executor",
-			"-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
-			"-D" + "classloader.resolve-order=parent-first",
-			getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
-		};
-
-		verifyCliFrontend(configuration, args, cliUnderTest, "parent-first", ParentFirstClassLoader.class.getName());
-	}
-
-	@Test
-	public void testDynamicPropertiesWithDefaultChildFirstClassloader() throws Exception {
-
-		String[] args = {
-			"-e", "test-executor",
-			"-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
-			getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
-		};
-
-		verifyCliFrontend(configuration, args, cliUnderTest, "child-first", ChildFirstClassLoader.class.getName());
-	}
-
-	@Test
-	public void testDynamicPropertiesWithChildFirstClassloader() throws Exception {
-
-		String[] args = {
-			"-e", "test-executor",
-			"-D" + CoreOptions.DEFAULT_PARALLELISM.key() + "=5",
-			"-D" + "classloader.resolve-order=child-first",
-			getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"
-		};
-
-		verifyCliFrontend(configuration, args, cliUnderTest, "child-first", ChildFirstClassLoader.class.getName());
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static void verifyCliFrontend(
-			Configuration configuration,
-			String[] parameters,
-			GenericCLI cliUnderTest,
-			String expectedResolveOrderOption,
-			String userCodeClassLoaderClassName) throws Exception {
-		TestingCliFrontend testFrontend =
-			new TestingCliFrontend(configuration, cliUnderTest, expectedResolveOrderOption, userCodeClassLoaderClassName);
-		testFrontend.run(parameters); // verifies the expected values (see below)
-	}
-
-	private static final class TestingCliFrontend extends CliFrontend {
-
-		private final String expectedResolveOrder;
-
-		private final String userCodeClassLoaderClassName;
-
-		private TestingCliFrontend(
-				Configuration configuration,
-				GenericCLI cliUnderTest,
-				String expectedResolveOrderOption,
-				String userCodeClassLoaderClassName) {
-			super(
-				configuration,
-				Collections.singletonList(cliUnderTest));
-			this.expectedResolveOrder = expectedResolveOrderOption;
-			this.userCodeClassLoaderClassName = userCodeClassLoaderClassName;
-		}
-
-		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program) {
-			assertEquals(TEST_JAR_MAIN_CLASS, program.getMainClassName());
-			assertEquals(expectedResolveOrder, configuration.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER));
-			assertEquals(userCodeClassLoaderClassName, program.getUserCodeClassLoader().getClass().getName());
-		}
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
index 66efd82..7f1c163 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -86,7 +86,7 @@ public class FlinkUserCodeClassLoaders {
 	/**
 	 * Regular URLClassLoader that first loads from the parent and only after that from the URLs.
 	 */
-	public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
+	static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
 
 		ParentFirstClassLoader(URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {
 			super(urls, parent, classLoadingExceptionHandler);