You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/12 07:59:02 UTC

[flink] 22/24: possible fix

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7ef9ed387334ca2bc9034d4fba82406e60a9fa9
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 11 14:58:45 2019 +0100

    possible fix
---
 .../java/org/apache/flink/client/ClientUtils.java  |   5 -
 .../org/apache/flink/client/cli/CliFrontend.java   |  24 +++--
 .../flink/client/cli/ExecutionConfigAccessor.java  |   4 +-
 .../org/apache/flink/api/java/FlinkILoopTest.java  | 111 ++-------------------
 4 files changed, 21 insertions(+), 123 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 96a02ea..b5537e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -29,10 +29,8 @@ import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
-import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -185,9 +183,6 @@ public enum ClientUtils {
 
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
-			// TODO: 11.11.19 this should move to the appropriate place
-//			ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
-
 			final ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
 					executorServiceLoader,
 					configuration,
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index f53095b..780eadd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -35,10 +35,12 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -176,7 +178,6 @@ public class CliFrontend {
 		final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
 
 		final ProgramOptions programOptions = new ProgramOptions(commandLine);
-		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
 
 		// evaluate help flag
 		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
@@ -186,11 +187,12 @@ public class CliFrontend {
 
 		if (!programOptions.isPython()) {
 			// Java program should be specified a JAR file
-			if (executionParameters.getJarFilePaths().isEmpty()) {
+			if (programOptions.getJarFilePath() == null) {
 				throw new CliArgsException("Java program should be specified a JAR file.");
 			}
 		}
 
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
 		final PackagedProgram program;
 		try {
 			LOG.info("Building program from JAR file");
@@ -201,8 +203,7 @@ public class CliFrontend {
 		}
 
 		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
-
-		final Configuration effectiveConfig = getEffectiveConfiguration(commandLine, executionParameters, customCommandLine);
+		final Configuration effectiveConfig = getEffectiveConfiguration(program, customCommandLine, commandLine, executionParameters);
 		try {
 			execute(effectiveConfig, program);
 		} finally {
@@ -214,13 +215,14 @@ public class CliFrontend {
 		ClientUtils.runProgram(clusterClientServiceLoader, configuration, program);
 	}
 
-	private Configuration getEffectiveConfiguration(CommandLine commandLine, ExecutionConfigAccessor executionParameters, CustomCommandLine customCommandLine) throws FlinkException {
-		// TODO: 01.11.19 all this should be merged nicely, e.g. executionParameters can
-		// take an already existing configuration as a parameter.
-		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-		final Configuration executionConfig = executionParameters.getConfiguration();
-		final Configuration effectiveConfig = new Configuration(executorConfig);
-		effectiveConfig.addAll(executionConfig);
+	private Configuration getEffectiveConfiguration(
+			final PackagedProgram program,
+			final CustomCommandLine customCommandLine,
+			final CommandLine commandLine,
+			final ExecutionConfigAccessor executionParameters) throws FlinkException {
+		final Configuration effectiveConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		executionParameters.applyToConfiguration(effectiveConfig);
+		ConfigUtils.encodeStreamToConfig(effectiveConfig, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
 		return effectiveConfig;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index e82dd36..05c3946 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -95,8 +95,8 @@ public class ExecutionConfigAccessor {
 		}
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
+	void applyToConfiguration(final Configuration effectiveConfig) {
+		effectiveConfig.addAll(configuration);
 	}
 
 	public List<URL> getJarFilePaths() {
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 19bc2a0..b86d5f4 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
@@ -29,22 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.BDDMockito;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.net.URL;
-import java.util.List;
 
 import scala.Option;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.settings.MutableSettings;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -52,49 +34,24 @@ import static org.junit.Assert.assertTrue;
 /**
  * Integration tests for {@link FlinkILoop}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(PlanExecutor.class)
-@PowerMockIgnore("javax.tools.*")
 public class FlinkILoopTest extends TestLogger {
 
 	@Test
-	public void testConfigurationForwarding() throws Exception {
+	public void testConfigurationForwarding() {
 		Configuration configuration = new Configuration();
 		configuration.setString("foobar", "foobar");
-		FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
 
-		final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
-
-		PowerMockito.mockStatic(PlanExecutor.class);
-		BDDMockito.given(PlanExecutor.createRemoteExecutor(
-			Matchers.anyString(),
-			Matchers.anyInt(),
-			Matchers.any(Configuration.class)
-		)).willAnswer(new Answer<PlanExecutor>() {
-			@Override
-			public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
-				testPlanExecutor.setHost((String) invocation.getArguments()[0]);
-				testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
-				testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
-				return testPlanExecutor;
-			}
-		});
-
-		Settings settings = new Settings();
-		((MutableSettings.BooleanSetting) settings.usejavacp()).value_$eq(true);
-
-		flinkILoop.settings_$eq(settings);
-		flinkILoop.createInterpreter();
+		FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
 
 		ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv();
 
-		env.fromElements(1).output(new DiscardingOutputFormat<Integer>());
+		assertTrue(env instanceof RemoteEnvironment);
 
-		env.execute("Test job");
+		RemoteEnvironment remoteEnv = (RemoteEnvironment) env;
 
-		Configuration forwardedConfiguration = testPlanExecutor.getConfiguration();
+		Configuration configUnderTest = remoteEnv.getExecutorConfiguration();
 
-		assertEquals(configuration, forwardedConfiguration);
+		assertEquals(configuration, configUnderTest);
 	}
 
 	@Test
@@ -114,60 +71,4 @@ public class FlinkILoopTest extends TestLogger {
 
 		assertEquals(configuration, forwardedConfiguration);
 	}
-
-	static class TestPlanExecutor extends PlanExecutor {
-
-		private String host;
-		private int port;
-		private Configuration configuration;
-		private List<String> jars;
-		private List<String> globalClasspaths;
-
-		@Override
-		public JobExecutionResult executePlan(
-				Pipeline plan, List<URL> jarFiles, List<URL> globalClasspaths) throws Exception {
-			return null;
-		}
-
-		public String getHost() {
-			return host;
-		}
-
-		public void setHost(String host) {
-			this.host = host;
-		}
-
-		public int getPort() {
-			return port;
-		}
-
-		public void setPort(int port) {
-			this.port = port;
-		}
-
-		public Configuration getConfiguration() {
-			return configuration;
-		}
-
-		public void setConfiguration(Configuration configuration) {
-			this.configuration = configuration;
-		}
-
-		public List<String> getJars() {
-			return jars;
-		}
-
-		public void setJars(List<String> jars) {
-			this.jars = jars;
-		}
-
-		public List<String> getGlobalClasspaths() {
-			return globalClasspaths;
-		}
-
-		public void setGlobalClasspaths(List<String> globalClasspaths) {
-			this.globalClasspaths = globalClasspaths;
-		}
-	}
-
 }