You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/22 14:20:28 UTC
[1/2] flink git commit: [FLINK-1918] [client] Fix misleading
NullPointerException in case of unresolvable host names
Repository: flink
Updated Branches:
refs/heads/master df7c61e2e -> 2b8db40ac
[FLINK-1918] [client] Fix misleading NullPointerException in case of unresolvable host names
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b8db40a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b8db40a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b8db40a
Branch: refs/heads/master
Commit: 2b8db40ac40d70027ce331f3a04c6ca7aa562a84
Parents: b704312
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 21 21:29:23 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 22 14:19:46 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/RemoteExecutor.java | 14 ++-
.../org/apache/flink/client/program/Client.java | 108 ++++++++++++-------
.../client/CliFrontendPackageProgramTest.java | 20 ++--
.../RemoteExecutorHostnameResolutionTest.java | 78 ++++++++++++++
.../program/ClientHostnameResolutionTest.java | 74 +++++++++++++
.../environment/RemoteStreamEnvironment.java | 36 ++++---
.../ExecutionEnvironmentITCase.java | 3 +
7 files changed, 267 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index a523169..373d70c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -124,18 +124,26 @@ public class RemoteExecutor extends PlanExecutor {
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
- public static InetSocketAddress getInetFromHostport(String hostport) {
+
+ /**
+ * Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
+ * The returned InetSocketAddress may be unresolved!
+ *
+ * @param hostport The "host:port" string.
+ * @return The converted InetSocketAddress.
+ */
+ private static InetSocketAddress getInetFromHostport(String hostport) {
// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my://" + hostport);
} catch (URISyntaxException e) {
- throw new RuntimeException("Could not identify hostname and port", e);
+ throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
- throw new RuntimeException("Could not identify hostname and port");
+ throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
}
return new InetSocketAddress(host, port);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index b4e5af1..c61e814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -22,11 +22,14 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
@@ -43,7 +46,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -65,12 +67,20 @@ import com.google.common.base.Preconditions;
public class Client {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+
+ /** The configuration to use for the client (optimizer, timeouts, ...) */
+ private final Configuration configuration;
+
+ /** The address of the JobManager to send the program to */
+ private final InetSocketAddress jobManagerAddress;
+
+ /** The optimizer used in the optimization of batch programs */
+ private final Optimizer compiler;
+
+ /** The class loader to use for classes from the user program (e.g., functions and data types) */
+ private final ClassLoader userCodeClassLoader;
-
- private final Configuration configuration; // the configuration describing the job manager address
-
- private final Optimizer compiler; // the compiler to compile the jobs
-
+ /** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;
/**
@@ -79,12 +89,9 @@ public class Client {
*/
private int maxSlots = -1;
- /**
- * ID of the last job submitted with this client.
- */
+ /** ID of the last job submitted with this client. */
private JobID lastJobId = null;
-
- private ClassLoader userCodeClassLoader;
+
// ------------------------------------------------------------------------
// Construction
@@ -96,13 +103,30 @@ public class Client {
*
* @param jobManagerAddress Address and port of the job-manager.
*/
- public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
+ public Client(InetSocketAddress jobManagerAddress, Configuration config,
+ ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException
+ {
+ Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
Preconditions.checkNotNull(config, "Configuration is null");
+ Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
+
this.configuration = config;
- // using the host string instead of the host name saves a reverse name lookup
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
+ if (jobManagerAddress.isUnresolved()) {
+ // address is unresolved, resolve it
+ String host = jobManagerAddress.getHostString();
+ try {
+ InetAddress address = InetAddress.getByName(host);
+ this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
+ }
+ catch (UnknownHostException e) {
+ throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
+ }
+ }
+ else {
+ // address is already resolved, use it as is
+ this.jobManagerAddress = jobManagerAddress;
+ }
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
this.userCodeClassLoader = userCodeClassLoader;
@@ -110,42 +134,55 @@ public class Client {
}
/**
- * Creates a instance that submits the programs to the job-manager defined in the
- * configuration.
+ * Creates a instance that submits the programs to the JobManager defined in the
+ * configuration. This method will try to resolve the JobManager hostname and throw an exception
+ * if that is not possible.
*
* @param config The config used to obtain the job-manager's address.
+ * @param userCodeClassLoader The class loader to use for loading user code classes.
*/
- public Client(Configuration config, ClassLoader userCodeClassLoader) {
+ public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
Preconditions.checkNotNull(config, "Configuration is null");
+ Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
+
this.configuration = config;
+ this.userCodeClassLoader = userCodeClassLoader;
// instantiate the address to the job manager
final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (address == null) {
- throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
+ throw new IllegalConfigurationException(
+ "Cannot find address to job manager's RPC service in the global configuration.");
}
- final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+ final int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if (port < 0) {
- throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
+ throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
+ }
+
+ try {
+ InetAddress inetAddress = InetAddress.getByName(address);
+ this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
+ }
+ catch (UnknownHostException e) {
+ throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address
+ + "' specified in the configuration");
}
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
- this.userCodeClassLoader = userCodeClassLoader;
}
-
+
+ /**
+ * Configures whether the client should print progress updates during the execution to {@code System.out}.
+ * All updates are logged via the SLF4J loggers regardless of this setting.
+ *
+ * @param print True to print updates to standard out during execution, false to not print them.
+ */
public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}
- public String getJobManagerAddress() {
- return this.configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- }
-
- public int getJobManagerPort() {
- return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
- }
-
/**
* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
* connected to this client.
@@ -316,14 +353,7 @@ public class Client {
public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();
-
- InetSocketAddress jobManagerAddress;
- try {
- jobManagerAddress = JobClient.getJobManagerAddress(configuration);
- }
- catch (IOException e) {
- throw new ProgramInvocationException(e.getMessage(), e);
- }
+
LOG.info("JobManager actor system address is " + jobManagerAddress);
LOG.info("Starting client actor system");
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 2ef84bc..c9ce12b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -294,25 +294,23 @@ public class CliFrontendPackageProgramTest {
assertArrayEquals(progArgs, prog.getArguments());
Configuration c = new Configuration();
- c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
+ c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
Client cli = new Client(c, getClass().getClassLoader());
+ // we expect this to fail with a "ClassNotFoundException"
cli.getOptimizedPlanAsJson(prog, 666);
+ fail("Should have failed with a ClassNotFoundException");
}
- catch (ProgramInvocationException pie) {
- assertTrue("Classloader was not called", callme[0]);
- // class not found exception is expected as some point
- if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
- System.err.println(pie.getMessage());
- pie.printStackTrace();
- fail("Program caused an exception: " + pie.getMessage());
+ catch (ProgramInvocationException e) {
+ if (!(e.getCause() instanceof ClassNotFoundException)) {
+ e.printStackTrace();
+ fail("Program didn't throw ClassNotFoundException");
}
+ assertTrue("Classloader was not called", callme[0]);
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
- assertTrue("Classloader was not called", callme[0]);
- fail("Program caused an exception: " + e.getMessage());
+ fail("Program failed with the wrong exception: " + e.getClass().getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
new file mode 100644
index 0000000..a1bd0e2
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+public class RemoteExecutorHostnameResolutionTest {
+
+ private static final String nonExistingHostname = "foo.bar.com.invalid";
+ private static final int port = 14451;
+
+
+ @Test
+ public void testUnresolvableHostname1() {
+ try {
+ RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
+ exec.executePlan(getProgram());
+ fail("This should fail with an UnknownHostException");
+ }
+ catch (UnknownHostException e) {
+ // that is what we want!
+ }
+ catch (Exception e) {
+ System.err.println("Wrong exception!");
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testUnresolvableHostname2() {
+ try {
+ InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
+ RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
+ exec.executePlan(getProgram());
+ fail("This should fail with an UnknownHostException");
+ }
+ catch (UnknownHostException e) {
+ // that is what we want!
+ }
+ catch (Exception e) {
+ System.err.println("Wrong exception!");
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static Plan getProgram() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
+ return env.createProgramPlan();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
new file mode 100644
index 0000000..2cdb1a0
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientHostnameResolutionTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that verify that the client correctly handles non-resolvable host names and does not
+ * fail with another exception
+ */
+public class ClientHostnameResolutionTest {
+
+ private static final String nonExistingHostname = "foo.bar.com.invalid";
+
+ @Test
+ public void testUnresolvableHostname1() {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(nonExistingHostname, 17234);
+ new Client(addr, new Configuration(), getClass().getClassLoader(), 1);
+ fail("This should fail with an UnknownHostException");
+ }
+ catch (UnknownHostException e) {
+ // that is what we want!
+ }
+ catch (Exception e) {
+ System.err.println("Wrong exception!");
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testUnresolvableHostname2() {
+ try {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+
+ new Client(config, getClass().getClassLoader());
+ fail("This should fail with an UnknownHostException");
+ }
+ catch (UnknownHostException e) {
+ // that is what we want!
+ }
+ catch (Exception e) {
+ System.err.println("Wrong exception!");
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index c3060fb..50127cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -37,9 +38,9 @@ import org.slf4j.LoggerFactory;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
- private String host;
- private int port;
- private List<File> jarFiles;
+ private final String host;
+ private final int port;
+ private final List<File> jarFiles;
/**
* Creates a new RemoteStreamEnvironment that points to the master
@@ -82,14 +83,14 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute() {
+ public JobExecutionResult execute() throws ProgramInvocationException {
JobGraph jobGraph = streamGraph.getJobGraph();
return executeRemotely(jobGraph);
}
@Override
- public JobExecutionResult execute(String jobName) {
+ public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
JobGraph jobGraph = streamGraph.getJobGraph(jobName);
return executeRemotely(jobGraph);
@@ -102,7 +103,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
* jobGraph to execute
* @return The result of the job execution, containing elapsed time and accumulators.
*/
- private JobExecutionResult executeRemotely(JobGraph jobGraph) {
+ private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
@@ -112,20 +113,29 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
Configuration configuration = jobGraph.getJobConfiguration();
- Client client = new Client(new InetSocketAddress(host, port), configuration,
- JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
- client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-
+ ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
+
try {
+ Client client = new Client(new InetSocketAddress(host, port), configuration, usercodeClassLoader, -1);
+ client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+
JobSubmissionResult result = client.run(jobGraph, true);
- if(result instanceof JobExecutionResult) {
+ if (result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
return new JobExecutionResult(result.getJobID(), -1, null);
}
- } catch (ProgramInvocationException e) {
- throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+ }
+ catch (ProgramInvocationException e) {
+ throw e;
+ }
+ catch (UnknownHostException e) {
+ throw new ProgramInvocationException(e.getMessage(), e);
+ }
+ catch (Exception e) {
+ String term = e.getMessage() == null ? "." : (": " + e.getMessage());
+ throw new ProgramInvocationException("The program execution failed" + term, e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b8db40a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index 4ebc381..109af1e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
/**
* Test ExecutionEnvironment from user perspective
*/
+@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
private static final int PARALLELISM = 5;
@@ -66,8 +67,10 @@ public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+ env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
[2/2] flink git commit: [FLINK-1878] [streaming] Stream environments
accept a flag that controls sysout logging during execution.
Posted by se...@apache.org.
[FLINK-1878] [streaming] Stream environments accept a flag that controls sysout logging during execution.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7043123
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7043123
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7043123
Branch: refs/heads/master
Commit: b70431239a5e18555866addb41ee6edf2b79ff60
Parents: df7c61e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 19:02:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 22 14:19:46 2015 +0200
----------------------------------------------------------------------
.../api/environment/LocalStreamEnvironment.java | 8 ++++----
.../api/environment/RemoteStreamEnvironment.java | 10 ++++++----
.../org/apache/flink/streaming/util/ClusterUtil.java | 12 ++++++------
.../test/checkpointing/StreamCheckpointingITCase.java | 1 +
4 files changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 07a552b..a59407c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
- protected static ClassLoader userClassLoader;
-
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a
* default name.
@@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute() throws Exception {
- return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
+ return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
+ getConfig().isSysoutLoggingEnabled());
}
/**
@@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism());
+ return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
+ getConfig().isSysoutLoggingEnabled());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 3308ab7..c3060fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
this.host = host;
this.port = port;
this.jarFiles = new ArrayList<File>();
- for (int i = 0; i < jarFiles.length; i++) {
- File file = new File(jarFiles[i]);
+ for (String jarFile : jarFiles) {
+ File file = new File(jarFile);
try {
JobWithJars.checkJarFile(file);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
}
this.jarFiles.add(file);
@@ -113,7 +114,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(host, port), configuration,
JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-
+ client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+
try {
JobSubmissionResult result = client.run(jobGraph, true);
if(result instanceof JobExecutionResult) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 64b7bd8..409e5ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -42,9 +42,8 @@ public class ClusterUtil {
* memorySize
* @return The result of the job execution, containing elapsed time and accumulators.
*/
- public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize)
- throws Exception {
-
+ public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize,
+ boolean printDuringExecution) throws Exception {
Configuration configuration = jobGraph.getJobConfiguration();
LocalFlinkMiniCluster exec = null;
@@ -58,7 +57,7 @@ public class ClusterUtil {
try {
exec = new LocalFlinkMiniCluster(configuration, true);
- SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true);
+ SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution);
return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
}
finally {
@@ -68,7 +67,8 @@ public class ClusterUtil {
}
}
- public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
- return runOnMiniCluster(jobGraph, numOfSlots, -1);
+ public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots,
+ boolean printDuringExecution) throws Exception {
+ return runOnMiniCluster(jobGraph, numOfSlots, -1, printDuringExecution);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7043123/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index fe2fdb8..c4bd095 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -100,6 +100,7 @@ public class StreamCheckpointingITCase {
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.enableCheckpointing(200);
+ env.getConfig().disableSysoutLogging();
DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {