You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/22 14:20:28 UTC

[1/2] flink git commit: [FLINK-1918] [client] Fix misleading NullPointerException in case of unresolvable host names

Repository: flink
Updated Branches:
  refs/heads/master df7c61e2e -> 2b8db40ac


[FLINK-1918] [client] Fix misleading NullPointerException in case of unresolvable host names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b8db40a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b8db40a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b8db40a

Branch: refs/heads/master
Commit: 2b8db40ac40d70027ce331f3a04c6ca7aa562a84
Parents: b704312
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 21 21:29:23 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 22 14:19:46 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |  14 ++-
 .../org/apache/flink/client/program/Client.java | 108 ++++++++++++-------
 .../client/CliFrontendPackageProgramTest.java   |  20 ++--
 .../RemoteExecutorHostnameResolutionTest.java   |  78 ++++++++++++++
 .../program/ClientHostnameResolutionTest.java   |  74 +++++++++++++
 .../environment/RemoteStreamEnvironment.java    |  36 ++++---
 .../ExecutionEnvironmentITCase.java             |   3 +
 7 files changed, 267 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index a523169..373d70c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -124,18 +124,26 @@ public class RemoteExecutor extends PlanExecutor {
 	// --------------------------------------------------------------------------------------------
 	//   Utilities
 	// --------------------------------------------------------------------------------------------
-	public static InetSocketAddress getInetFromHostport(String hostport) {
+
+	/**
+	 * Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
+	 * The returned InetSocketAddress may be unresolved!
+	 * 
+	 * @param hostport The "host:port" string.
+	 * @return The converted InetSocketAddress.
+	 */
+	private static InetSocketAddress getInetFromHostport(String hostport) {
 		// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
 		URI uri;
 		try {
 			uri = new URI("my://" + hostport);
 		} catch (URISyntaxException e) {
-			throw new RuntimeException("Could not identify hostname and port", e);
+			throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
 		}
 		String host = uri.getHost();
 		int port = uri.getPort();
 		if (host == null || port == -1) {
-			throw new RuntimeException("Could not identify hostname and port");
+			throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
 		}
 		return new InetSocketAddress(host, port);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index b4e5af1..c61e814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -22,11 +22,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.List;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -43,7 +46,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -65,12 +67,20 @@ import com.google.common.base.Preconditions;
 public class Client {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+
+	/** The configuration to use for the client (optimizer, timeouts, ...) */
+	private final Configuration configuration;
+
+	/** The address of the JobManager to send the program to */
+	private final InetSocketAddress jobManagerAddress;
+
+	/** The optimizer used in the optimization of batch programs */
+	private final Optimizer compiler;
+
+	/** The class loader to use for classes from the user program (e.g., functions and data types) */
+	private final ClassLoader userCodeClassLoader;
 	
-	
-	private final Configuration configuration;	// the configuration describing the job manager address
-	
-	private final Optimizer compiler;		// the compiler to compile the jobs
-	
+	/** Flag indicating whether to sysout print execution updates */
 	private boolean printStatusDuringExecution = true;
 
 	/**
@@ -79,12 +89,9 @@ public class Client {
 	 */
 	private int maxSlots = -1;
 
-	/**
-	 * ID of the last job submitted with this client.
-	 */
+	/** ID of the last job submitted with this client. */
 	private JobID lastJobId = null;
-
-	private ClassLoader userCodeClassLoader;
+	
 	
 	// ------------------------------------------------------------------------
 	//                            Construction
@@ -96,13 +103,30 @@ public class Client {
 	 * 
 	 * @param jobManagerAddress Address and port of the job-manager.
 	 */
-	public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
+	public Client(InetSocketAddress jobManagerAddress, Configuration config, 
+							ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException
+	{
+		Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
 		Preconditions.checkNotNull(config, "Configuration is null");
+		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
+		
 		this.configuration = config;
 		
-		// using the host string instead of the host name saves a reverse name lookup
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
+		if (jobManagerAddress.isUnresolved()) {
+			// address is unresolved, resolve it
+			String host = jobManagerAddress.getHostString();
+			try {
+				InetAddress address = InetAddress.getByName(host);
+				this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
+			}
+			catch (UnknownHostException e) {
+				throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
+			}
+		}
+		else {
+			// address is already resolved, use it as is
+			this.jobManagerAddress = jobManagerAddress;
+		}
 		
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
 		this.userCodeClassLoader = userCodeClassLoader;
@@ -110,42 +134,55 @@ public class Client {
 	}
 
 	/**
-	 * Creates a instance that submits the programs to the job-manager defined in the
-	 * configuration.
+	 * Creates a instance that submits the programs to the JobManager defined in the
+	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
+	 * if that is not possible.
 	 * 
 	 * @param config The config used to obtain the job-manager's address.
+	 * @param userCodeClassLoader The class loader to use for loading user code classes.   
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader) {
+	public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
 		Preconditions.checkNotNull(config, "Configuration is null");
+		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
+		
 		this.configuration = config;
+		this.userCodeClassLoader = userCodeClassLoader;
 		
 		// instantiate the address to the job manager
 		final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 		if (address == null) {
-			throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
+			throw new IllegalConfigurationException(
+					"Cannot find address to job manager's RPC service in the global configuration.");
 		}
 		
-		final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+		final int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+														ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
 		if (port < 0) {
-			throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
+			throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
+		}
+		
+		try {
+			InetAddress inetAddress = InetAddress.getByName(address);
+			this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
+		}
+		catch (UnknownHostException e) {
+			throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address
+					+ "' specified in the configuration");
 		}
 
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
-		this.userCodeClassLoader = userCodeClassLoader;
 	}
-	
+
+	/**
+	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
+	 * All updates are logged via the SLF4J loggers regardless of this setting.
+	 * 
+	 * @param print True to print updates to standard out during execution, false to not print them.
+	 */
 	public void setPrintStatusDuringExecution(boolean print) {
 		this.printStatusDuringExecution = print;
 	}
 
-	public String getJobManagerAddress() {
-		return this.configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-	}
-	
-	public int getJobManagerPort() {
-		return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-	}
-
 	/**
 	 * @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
 	 * connected to this client.
@@ -316,14 +353,7 @@ public class Client {
 
 	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
 		this.lastJobId = jobGraph.getJobID();
-
-		InetSocketAddress jobManagerAddress;
-		try {
-			jobManagerAddress = JobClient.getJobManagerAddress(configuration);
-		}
-		catch (IOException e) {
-			throw new ProgramInvocationException(e.getMessage(), e);
-		}
+		
 		LOG.info("JobManager actor system address is " + jobManagerAddress);
 		
 		LOG.info("Starting client actor system");

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 2ef84bc..c9ce12b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -294,25 +294,23 @@ public class CliFrontendPackageProgramTest {
 			assertArrayEquals(progArgs, prog.getArguments());
 
 			Configuration c = new Configuration();
-			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
+			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			Client cli = new Client(c, getClass().getClassLoader());
 			
+			// we expect this to fail with a "ClassNotFoundException"
 			cli.getOptimizedPlanAsJson(prog, 666);
+			fail("Should have failed with a ClassNotFoundException");
 		}
-		catch (ProgramInvocationException pie) {
-			assertTrue("Classloader was not called", callme[0]);
-			// class not found exception is expected as some point
-			if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
-				System.err.println(pie.getMessage());
-				pie.printStackTrace();
-				fail("Program caused an exception: " + pie.getMessage());
+		catch (ProgramInvocationException e) {
+			if (!(e.getCause() instanceof ClassNotFoundException)) {
+				e.printStackTrace();
+				fail("Program didn't throw ClassNotFoundException");
 			}
+			assertTrue("Classloader was not called", callme[0]);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			assertTrue("Classloader was not called", callme[0]);
-			fail("Program caused an exception: " + e.getMessage());
+			fail("Program failed with the wrong exception: " + e.getClass().getName());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
new file mode 100644
index 0000000..a1bd0e2
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+public class RemoteExecutorHostnameResolutionTest {
+
+	private static final String nonExistingHostname = "foo.bar.com.invalid";
+	private static final int port = 14451;
+	
+	
+	@Test
+	public void testUnresolvableHostname1() {
+		try {
+			RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
+			exec.executePlan(getProgram());
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testUnresolvableHostname2() {
+		try {
+			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
+			RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
+			exec.executePlan(getProgram());
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static Plan getProgram() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
+		return env.createProgramPlan();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
new file mode 100644
index 0000000..2cdb1a0
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that verify that the client correctly handles non-resolvable host names and does not
+ * fail with another exception
+ */
+public class ClientHostnameResolutionTest {
+	
+	private static final String nonExistingHostname = "foo.bar.com.invalid";
+	
+	@Test
+	public void testUnresolvableHostname1() {
+		try {
+			InetSocketAddress addr = new InetSocketAddress(nonExistingHostname, 17234);
+			new Client(addr, new Configuration(), getClass().getClassLoader(), 1);
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testUnresolvableHostname2() {
+		try {
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+			
+			new Client(config, getClass().getClassLoader());
+			fail("This should fail with an UnknownHostException");
+		}
+		catch (UnknownHostException e) {
+			// that is what we want!
+		}
+		catch (Exception e) {
+			System.err.println("Wrong exception!");
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index c3060fb..50127cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -37,9 +38,9 @@ import org.slf4j.LoggerFactory;
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
-	private String host;
-	private int port;
-	private List<File> jarFiles;
+	private final String host;
+	private final int port;
+	private final List<File> jarFiles;
 
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
@@ -82,14 +83,14 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute() {
+	public JobExecutionResult execute() throws ProgramInvocationException {
 
 		JobGraph jobGraph = streamGraph.getJobGraph();
 		return executeRemotely(jobGraph);
 	}
 
 	@Override
-	public JobExecutionResult execute(String jobName) {
+	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
 
 		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
 		return executeRemotely(jobGraph);
@@ -102,7 +103,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            jobGraph to execute
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	private JobExecutionResult executeRemotely(JobGraph jobGraph) {
+	private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
@@ -112,20 +113,29 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		}
 
 		Configuration configuration = jobGraph.getJobConfiguration();
-		Client client = new Client(new InetSocketAddress(host, port), configuration,
-				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-		client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-		
+		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+
 		try {
+			Client client = new Client(new InetSocketAddress(host, port), configuration, usercodeClassLoader, -1);
+			client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+			
 			JobSubmissionResult result = client.run(jobGraph, true);
-			if(result instanceof JobExecutionResult) {
+			if (result instanceof JobExecutionResult) {
 				return (JobExecutionResult) result;
 			} else {
 				LOG.warn("The Client didn't return a JobExecutionResult");
 				return new JobExecutionResult(result.getJobID(), -1, null);
 			}
-		} catch (ProgramInvocationException e) {
-			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+		catch (ProgramInvocationException e) {
+			throw e;
+		}
+		catch (UnknownHostException e) {
+			throw new ProgramInvocationException(e.getMessage(), e);
+		}
+		catch (Exception e) {
+			String term = e.getMessage() == null ? "." : (": " + e.getMessage());
+			throw new ProgramInvocationException("The program execution failed" + term, e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index 4ebc381..109af1e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test ExecutionEnvironment from user perspective
  */
+@SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
 	private static final int PARALLELISM = 5;
@@ -66,8 +67,10 @@ public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
 	public void testLocalEnvironmentWithConfig() throws Exception {
 		Configuration conf = new Configuration();
 		conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		
 		final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
 		env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
 				.rebalance()


[2/2] flink git commit: [FLINK-1878] [streaming] Stream environments accept a flag that controls sysout logging during execution.

Posted by se...@apache.org.
[FLINK-1878] [streaming] Stream environments accept a flag that controls sysout logging during execution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7043123
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7043123
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7043123

Branch: refs/heads/master
Commit: b70431239a5e18555866addb41ee6edf2b79ff60
Parents: df7c61e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 19:02:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 22 14:19:46 2015 +0200

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java         |  8 ++++----
 .../api/environment/RemoteStreamEnvironment.java        | 10 ++++++----
 .../org/apache/flink/streaming/util/ClusterUtil.java    | 12 ++++++------
 .../test/checkpointing/StreamCheckpointingITCase.java   |  1 +
 4 files changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 07a552b..a59407c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil;
 
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-	protected static ClassLoader userClassLoader;
-
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
 	 * default name.
@@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute() throws Exception {
-		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
+		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
+																getConfig().isSysoutLoggingEnabled());
 	}
 
 	/**
@@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism());
+		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
+																		getConfig().isSysoutLoggingEnabled());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 3308ab7..c3060fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		this.host = host;
 		this.port = port;
 		this.jarFiles = new ArrayList<File>();
-		for (int i = 0; i < jarFiles.length; i++) {
-			File file = new File(jarFiles[i]);
+		for (String jarFile : jarFiles) {
+			File file = new File(jarFile);
 			try {
 				JobWithJars.checkJarFile(file);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
 			}
 			this.jarFiles.add(file);
@@ -113,7 +114,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		Configuration configuration = jobGraph.getJobConfiguration();
 		Client client = new Client(new InetSocketAddress(host, port), configuration,
 				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-
+		client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+		
 		try {
 			JobSubmissionResult result = client.run(jobGraph, true);
 			if(result instanceof JobExecutionResult) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 64b7bd8..409e5ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -42,9 +42,8 @@ public class ClusterUtil {
 	 *            memorySize
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
-	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize)
-			throws Exception {
-
+	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize,
+																boolean printDuringExecution) throws Exception {
 		Configuration configuration = jobGraph.getJobConfiguration();
 
 		LocalFlinkMiniCluster exec = null;
@@ -58,7 +57,7 @@ public class ClusterUtil {
 		try {
 			exec = new LocalFlinkMiniCluster(configuration, true);
 
-			SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true);
+			SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution);
 			return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
 		}
 		finally {
@@ -68,7 +67,8 @@ public class ClusterUtil {
 		}
 	}
 
-	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
-		return runOnMiniCluster(jobGraph, numOfSlots, -1);
+	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots,
+														boolean printDuringExecution) throws Exception {
+		return runOnMiniCluster(jobGraph, numOfSlots, -1, printDuringExecution);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index fe2fdb8..c4bd095 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -100,6 +100,7 @@ public class StreamCheckpointingITCase {
 																	"localhost", cluster.getJobManagerRPCPort());
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(200);
+			env.getConfig().disableSysoutLogging();
 
 			DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {