You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/04/25 18:00:45 UTC

flink git commit: [FLINK-3778] [shell] Forward configuration from FlinkILoop to ScalaShellRemoteStreamEnvironment

Repository: flink
Updated Branches:
  refs/heads/master 0ee7c79a3 -> 7498d3e35


[FLINK-3778] [shell] Forward configuration from FlinkILoop to ScalaShellRemoteStreamEnvironment

With this PR the configuration of the FlinkILoop is properly forwarded to the
ScalaShellRemoteStreamEnvironment.

This closes #1906.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7498d3e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7498d3e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7498d3e3

Branch: refs/heads/master
Commit: 7498d3e35a29449270a88a30eb32b7de74887f5b
Parents: 0ee7c79
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Apr 18 18:34:55 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Apr 25 17:50:32 2016 +0200

----------------------------------------------------------------------
 .../java/ScalaShellRemoteStreamEnvironment.java | 41 +++++++++++---------
 .../org/apache/flink/api/scala/FlinkILoop.scala |  7 +++-
 .../apache/flink/api/java/FlinkILoopTest.java   | 21 ++++++++++
 .../environment/RemoteStreamEnvironment.java    | 29 ++++++++------
 .../api/scala/StreamExecutionEnvironment.scala  |  5 +++
 5 files changed, 72 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7498d3e3/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
index 3fa8358..43c8174 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
@@ -27,9 +28,10 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 
 
 public class ScalaShellRemoteStreamEnvironment extends RemoteStreamEnvironment {
@@ -45,12 +47,19 @@ public class ScalaShellRemoteStreamEnvironment extends RemoteStreamEnvironment {
 	 *				 program should be executed.
 	 * @param port	 The port of the master (JobManager), where the program should
 	 *				 be executed.
+	 * @param configuration The configuration to be used for the environment
 	 * @param jarFiles The JAR files with code that needs to be shipped to the
 	 *				 cluster. If the program uses user-defined functions,
 	 *				 user-defined input formats, or any libraries, those must be
 	 */
-	public ScalaShellRemoteStreamEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
-		super(host, port, jarFiles);
+	public ScalaShellRemoteStreamEnvironment(
+		String host,
+		int port,
+		FlinkILoop flinkILoop,
+		Configuration configuration,
+		String... jarFiles) {
+
+		super(host, port, configuration, jarFiles);
 		this.flinkILoop = flinkILoop;
 	}
 	/**
@@ -58,30 +67,24 @@ public class ScalaShellRemoteStreamEnvironment extends RemoteStreamEnvironment {
 	 *
 	 * @param streamGraph
 	 *            Stream Graph to execute
+	 * @param jarFiles
+	 * 			  List of jar file URLs to ship to the cluster
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	protected JobExecutionResult executeRemotely(StreamGraph streamGraph) throws ProgramInvocationException {
-		URL jarUrl = null;
+	@Override
+	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
+		URL jarUrl;
 		try {
 			jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
 		} catch (MalformedURLException e) {
-			e.printStackTrace();
+			throw new ProgramInvocationException("Could not write the user code classes to disk.", e);
 		}
 
-		jarFiles.add(jarUrl);
-		// get external (library) jars
-		String[] extJars = this.flinkILoop.getExternalJars();
+		List<URL> allJarFiles = new ArrayList<>(jarFiles.size() + 1);
+		allJarFiles.addAll(jarFiles);
+		allJarFiles.add(jarUrl);
 
-		for (String extJar : extJars) {
-			URL extJarUrl = null;
-			try {
-				extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL();
-			} catch (MalformedURLException e) {
-				e.printStackTrace();
-			}
-			jarFiles.add(extJarUrl);
-		}
-		return super.executeRemotely(streamGraph);
+		return super.executeRemotely(streamGraph, allJarFiles);
 	}
 
 	public void setAsContext() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7498d3e3/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 3ac5da8..e02c2b0 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -77,7 +77,12 @@ class FlinkILoop(
       this,
       clientConfig,
       this.getExternalJars(): _*)
-    val remoteSenv = new ScalaShellRemoteStreamEnvironment(host, port, this);
+    val remoteSenv = new ScalaShellRemoteStreamEnvironment(
+      host,
+      port,
+      this,
+      clientConfig,
+      getExternalJars(): _*)
     // prevent further instantiation of environments
     ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/7498d3e3/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index a3c272f..919f805 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,6 +44,7 @@ import scala.tools.nsc.settings.MutableSettings;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(PlanExecutor.class)
@@ -92,6 +95,24 @@ public class FlinkILoopTest extends TestLogger {
 		assertEquals(configuration, forwardedConfiguration);
 	}
 
+	@Test
+	public void testConfigurationForwardingStreamEnvironment() {
+		Configuration configuration = new Configuration();
+		configuration.setString("foobar", "foobar");
+
+		FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty());
+
+		StreamExecutionEnvironment streamEnv = flinkILoop.scalaSenv().getJavaEnv();
+
+		assertTrue(streamEnv instanceof RemoteStreamEnvironment);
+
+		RemoteStreamEnvironment remoteStreamEnv = (RemoteStreamEnvironment) streamEnv;
+
+		Configuration forwardedConfiguration = remoteStreamEnv.getClientConfiguration();
+
+		assertEquals(configuration, forwardedConfiguration);
+	}
+
 	static class TestPlanExecutor extends PlanExecutor {
 
 		private String host;

http://git-wip-us.apache.org/repos/asf/flink/blob/7498d3e3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 66dfa06..03945a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -52,10 +52,10 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	private final int port;
 
 	/** The configuration used to parametrize the client that connects to the remote cluster */
-	private final Configuration config;
+	private final Configuration clientConfiguration;
 
 	/** The jar files that need to be attached to each job */
-	protected final List<URL> jarFiles;
+	private final List<URL> jarFiles;
 	
 	/** The classpaths that need to be attached to each job */
 	private final List<URL> globalClasspaths;
@@ -90,7 +90,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param port
 	 *            The port of the master (JobManager), where the program should
 	 *            be executed.
-	 * @param config
+	 * @param clientConfiguration
 	 *            The configuration used to parametrize the client that connects to the
 	 *            remote cluster.
 	 * @param jarFiles
@@ -99,8 +99,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            user-defined input formats, or any libraries, those must be
 	 *            provided in the JAR files.
 	 */
-	public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
-		this(host, port, config, jarFiles, null);
+	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
+		this(host, port, clientConfiguration, jarFiles, null);
 	}
 
 	/**
@@ -113,7 +113,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param port
 	 *            The port of the master (JobManager), where the program should
 	 *            be executed.
-	 * @param config
+	 * @param clientConfiguration
 	 *            The configuration used to parametrize the client that connects to the
 	 *            remote cluster.
 	 * @param jarFiles
@@ -127,7 +127,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
 	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
-	public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
+	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, " +
@@ -143,7 +143,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 		this.host = host;
 		this.port = port;
-		this.config = config == null ? new Configuration() : config;
+		this.clientConfiguration = clientConfiguration == null ? new Configuration() : clientConfiguration;
 		this.jarFiles = new ArrayList<>(jarFiles.length);
 		for (String jarFile : jarFiles) {
 			try {
@@ -169,7 +169,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 		transformations.clear();
-		return executeRemotely(streamGraph);
+		return executeRemotely(streamGraph, jarFiles);
 	}
 
 	/**
@@ -177,9 +177,11 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 * 
 	 * @param streamGraph
 	 *            Stream Graph to execute
+	 * @param jarFiles
+	 * 			  List of jar file URLs to ship to the cluster
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	protected JobExecutionResult executeRemotely(StreamGraph streamGraph) throws ProgramInvocationException {
+	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
@@ -188,7 +190,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			getClass().getClassLoader());
 		
 		Configuration configuration = new Configuration();
-		configuration.addAll(this.config);
+		configuration.addAll(this.clientConfiguration);
 		
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
@@ -242,4 +244,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	public int getPort() {
 		return port;
 	}
+
+
+	public Configuration getClientConfiguration() {
+		return clientConfiguration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7498d3e3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 3736225..7be7840 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -42,6 +42,11 @@ import _root_.scala.language.implicitConversions
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
+    * @return the wrapped Java environment
+    */
+  def getJavaEnv: JavaEnv = javaEnv
+
+  /**
    * Gets the config object.
    */
   def getConfig = javaEnv.getConfig