You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/04/25 15:31:06 UTC

flink git commit: [FLINK-3774] [shell] Forwards Flink configuration to PlanExecutor

Repository: flink
Updated Branches:
  refs/heads/master 3f4e2f88b -> 0ee7c79a3


[FLINK-3774] [shell] Forwards Flink configuration to PlanExecutor

The ScalaShellRemoteEnvironment did not properly forward the given Flink configuration
to the PlanExecutor. Consequently, it was not possible to configure the Client to connect
to an HA cluster. This PR corrects the forwarding.

Fix failing FlinkILoopTest with Scala 2.11

This closes #1904.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ee7c79a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ee7c79a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ee7c79a

Branch: refs/heads/master
Commit: 0ee7c79a32f3aa39f740bdecbf53487f39f6591f
Parents: 3f4e2f8
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Apr 18 17:41:47 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Apr 25 15:30:30 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/PlanExecutor.java   |  11 +-
 .../flink/api/java/RemoteEnvironment.java       |  39 +++--
 .../minicluster/LocalFlinkMiniCluster.scala     |   2 -
 .../org/apache/flink/api/java/JarHelper.java    |   2 +-
 .../api/java/ScalaShellRemoteEnvironment.java   |  46 ++---
 .../org/apache/flink/api/scala/FlinkILoop.scala |   7 +-
 .../apache/flink/api/java/FlinkILoopTest.java   | 174 +++++++++++++++++++
 7 files changed, 232 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index add8b80..87f0e09 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.URL;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -177,7 +176,7 @@ public abstract class PlanExecutor {
 	 * @return A remote executor.
 	 */
 	public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
-			URL[] jarFiles, URL[] globalClasspaths) {
+			List<URL> jarFiles, List<URL> globalClasspaths) {
 		if (hostname == null) {
 			throw new IllegalArgumentException("The hostname must not be null.");
 		}
@@ -187,10 +186,10 @@ public abstract class PlanExecutor {
 		
 		Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
 		
-		List<URL> files = (jarFiles == null || jarFiles.length == 0) ?
-				Collections.<URL>emptyList() : Arrays.asList(jarFiles);
-		List<URL> paths = (globalClasspaths == null || globalClasspaths.length == 0) ?
-				Collections.<URL>emptyList() : Arrays.asList(globalClasspaths);
+		List<URL> files = (jarFiles == null) ?
+				Collections.<URL>emptyList() : jarFiles;
+		List<URL> paths = (globalClasspaths == null) ?
+				Collections.<URL>emptyList() : globalClasspaths;
 
 		try {
 			PlanExecutor executor = (clientConfiguration == null) ?

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 223ebee..34a54ba 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -30,6 +30,10 @@ import org.apache.flink.configuration.Configuration;
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
@@ -51,19 +55,19 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	protected final int port;
 
 	/** The jar files that need to be attached to each job */
-	private final URL[] jarFiles;
+	protected final List<URL> jarFiles;
 
 	/** The configuration used by the client that connects to the cluster */
-	private Configuration clientConfiguration;
+	protected Configuration clientConfiguration;
 	
 	/** The remote executor lazily created upon first use */
-	private PlanExecutor executor;
+	protected PlanExecutor executor;
 	
 	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
 	private Thread shutdownHook;
 
 	/** The classpaths that need to be attached to each job */
-	private final URL[] globalClasspaths;
+	protected final List<URL> globalClasspaths;
 
 	/**
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
@@ -133,26 +137,31 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		this.port = port;
 		this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
 		if (jarFiles != null) {
-			this.jarFiles = new URL[jarFiles.length];
-			for (int i = 0; i < jarFiles.length; i++) {
+			this.jarFiles = new ArrayList<>(jarFiles.length);
+			for (String jarFile : jarFiles) {
 				try {
-					this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+					this.jarFiles.add(new File(jarFile).getAbsoluteFile().toURI().toURL());
 				} catch (MalformedURLException e) {
 					throw new IllegalArgumentException("JAR file path invalid", e);
 				}
 			}
 		}
 		else {
-			this.jarFiles = null;
+			this.jarFiles = Collections.emptyList();
+		}
+
+		if (globalClasspaths == null) {
+			this.globalClasspaths = Collections.emptyList();
+		} else {
+			this.globalClasspaths = Arrays.asList(globalClasspaths);
 		}
-		this.globalClasspaths = globalClasspaths;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		ensureExecutorCreated();
+		PlanExecutor executor = getExecutor();
 
 		Plan p = createProgramPlan(jobName);
 
@@ -178,7 +187,11 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		}
 		else {
 			PlanExecutor le = PlanExecutor.createLocalExecutor(null);
-			return le.getOptimizerPlanAsJSON(p);
+			String plan = le.getOptimizerPlanAsJSON(p);
+
+			le.stop();
+
+			return plan;
 		}
 	}
 
@@ -190,7 +203,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		installShutdownHook();
 	}
 	
-	private void ensureExecutorCreated() throws Exception {
+	protected PlanExecutor getExecutor() throws Exception {
 		if (executor == null) {
 			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
 				jarFiles, globalClasspaths);
@@ -202,6 +215,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 			executor.start();
 			installShutdownHook();
 		}
+
+		return executor;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 1b398de..5bebd48 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.util
-
 import akka.actor.{ActorRef, ActorSystem}
 import org.apache.flink.api.common.JobID
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
index 83fb342..e00c602 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
@@ -164,7 +164,7 @@ public class JarHelper
 		File f = new File(dirOrFile2jar, dirList[i]);
 		jarDir(f,jos,subPath);
 		}
-	} else {
+	} else if (dirOrFile2jar.exists()) {
 		if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
 		{
 		if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 52cbfe6..1465e1d 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,14 +19,11 @@ package org.apache.flink.api.java;
  * limitations under the License.
  */
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 
-import java.io.File;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
@@ -53,41 +50,36 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
 	 *                 provided in the JAR files.
 	 */
-	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) {
+	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) throws Exception {
 		super(host, port, clientConfig, jarFiles, null);
 		this.flinkILoop = flinkILoop;
 	}
 
-	/**
-	 * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
-	 *
-	 * @param jobName name of the job as string
-	 * @return Result of the computation
-	 * @throws Exception
-	 */
 	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		Plan p = createProgramPlan(jobName);
+	protected PlanExecutor getExecutor() throws Exception {
+		// check if we had already started a PlanExecutor. If true, then stop it, because there will
+		// be a new jar file available for the user code classes
+		if (this.executor != null) {
+			this.executor.stop();
+		}
 
+		// write generated classes to disk so that they can be shipped to the cluster
 		URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
 
-		// get "external jars, and add the shell command jar, pass to executor
-		List<URL> alljars = new ArrayList<>();
-		// get external (library) jars
-		String[] extJars = this.flinkILoop.getExternalJars();
+		List<URL> allJarFiles = new ArrayList<>(jarFiles);
+		allJarFiles.add(jarUrl);
 
-		for (String extJar : extJars) {
-			URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL();
-			alljars.add(extJarUrl);
-		}
+		this.executor = PlanExecutor.createRemoteExecutor(
+			host,
+			port,
+			clientConfiguration,
+			allJarFiles,
+			globalClasspaths
+		);
 
-		// add shell commands
-		alljars.add(jarUrl);
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
-				alljars.toArray(new URL[alljars.size()]), null);
+		executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
 
-		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-		return executor.executePlan(p);
+		return executor;
 	}
 
 	public static void disableAllContextAndOtherEnvironments() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index b17cf15..3ac5da8 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -71,7 +71,12 @@ class FlinkILoop(
     ScalaShellRemoteEnvironment.resetContextEnvironments()
     
     // create our environment that submits against the cluster (local or remote)
-    val remoteBenv = new ScalaShellRemoteEnvironment(host, port, this, clientConfig)
+    val remoteBenv = new ScalaShellRemoteEnvironment(
+      host,
+      port,
+      this,
+      clientConfig,
+      this.getExternalJars(): _*)
     val remoteSenv = new ScalaShellRemoteStreamEnvironment(host, port, this);
     // prevent further instantiation of environments
     ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()

http://git-wip-us.apache.org/repos/asf/flink/blob/0ee7c79a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
new file mode 100644
index 0000000..a3c272f
--- /dev/null
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.BDDMockito;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import scala.Option;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PlanExecutor.class)
+public class FlinkILoopTest extends TestLogger {
+
+	@Test
+	public void testConfigurationForwarding() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString("foobar", "foobar");
+		FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
+
+		final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
+
+		PowerMockito.mockStatic(PlanExecutor.class);
+		BDDMockito.given(PlanExecutor.createRemoteExecutor(
+			Matchers.anyString(),
+			Matchers.anyInt(),
+			Matchers.any(Configuration.class),
+			Matchers.any(java.util.List.class),
+			Matchers.any(java.util.List.class)
+		)).willAnswer(new Answer<PlanExecutor>() {
+			@Override
+			public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
+				testPlanExecutor.setHost((String)invocation.getArguments()[0]);
+				testPlanExecutor.setPort((Integer)invocation.getArguments()[1]);
+				testPlanExecutor.setConfiguration((Configuration)invocation.getArguments()[2]);
+				testPlanExecutor.setJars((List<String>)invocation.getArguments()[3]);
+				testPlanExecutor.setGlobalClasspaths((List<String>)invocation.getArguments()[4]);
+
+				return testPlanExecutor;
+			}
+		});
+
+		Settings settings = new Settings();
+		((MutableSettings.BooleanSetting)settings.usejavacp()).value_$eq(true);
+
+		flinkILoop.settings_$eq(settings);
+		flinkILoop.createInterpreter();
+
+		ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv();
+
+		env.fromElements(1).output(new DiscardingOutputFormat<Integer>());
+
+		env.execute("Test job");
+
+		Configuration forwardedConfiguration = testPlanExecutor.getConfiguration();
+
+		assertEquals(configuration, forwardedConfiguration);
+	}
+
+	static class TestPlanExecutor extends PlanExecutor {
+
+		private String host;
+		private int port;
+		private Configuration configuration;
+		private List<String> jars;
+		private List<String> globalClasspaths;
+
+		@Override
+		public void start() throws Exception {
+
+		}
+
+		@Override
+		public void stop() throws Exception {
+
+		}
+
+		@Override
+		public boolean isRunning() {
+			return false;
+		}
+
+		@Override
+		public JobExecutionResult executePlan(Plan plan) throws Exception {
+			return null;
+		}
+
+		@Override
+		public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
+			return null;
+		}
+
+		@Override
+		public void endSession(JobID jobID) throws Exception {
+
+		}
+
+		public String getHost() {
+			return host;
+		}
+
+		public void setHost(String host) {
+			this.host = host;
+		}
+
+		public int getPort() {
+			return port;
+		}
+
+		public void setPort(int port) {
+			this.port = port;
+		}
+
+		public Configuration getConfiguration() {
+			return configuration;
+		}
+
+		public void setConfiguration(Configuration configuration) {
+			this.configuration = configuration;
+		}
+
+		public List<String> getJars() {
+			return jars;
+		}
+
+		public void setJars(List<String> jars) {
+			this.jars = jars;
+		}
+
+		public List<String> getGlobalClasspaths() {
+			return globalClasspaths;
+		}
+
+		public void setGlobalClasspaths(List<String> globalClasspaths) {
+			this.globalClasspaths = globalClasspaths;
+		}
+	}
+
+}