You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:16 UTC

[11/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

[FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

Remove client less factory methods from ZooKeeperUtils

Introduce default job id

This closes #3781.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddd6a99a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddd6a99a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddd6a99a

Branch: refs/heads/master
Commit: ddd6a99a95b56c52ea5b5153b7270b578f5479bc
Parents: a0bb99c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Mar 16 17:03:03 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:06:07 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   6 +-
 .../flink/client/cli/CustomCommandLine.java     |   4 +-
 .../flink/client/program/ClusterClient.java     |  89 ++++----
 .../client/program/StandaloneClusterClient.java |   8 +-
 .../RemoteExecutorHostnameResolutionTest.java   |  22 +-
 .../apache/flink/client/program/ClientTest.java |   5 +-
 ...rRetrievalServiceHostnameResolutionTest.java |  32 +--
 .../api/avro/AvroExternalJarProgramITCase.java  |  22 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../connectors/kafka/Kafka010ITCase.java        |  14 +-
 .../connectors/kafka/Kafka08ITCase.java         |   4 +-
 .../kafka/Kafka09SecuredRunITCase.java          |   2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  96 ++++----
 .../connectors/kafka/KafkaProducerTestBase.java |   2 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  18 +-
 .../connectors/kafka/KafkaTestBase.java         |  23 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   4 +-
 .../flink/contrib/streaming/CollectITCase.java  |   9 +-
 .../operations/DegreesWithExceptionITCase.java  |  42 ++--
 .../ReduceOnEdgesWithExceptionITCase.java       |  37 ++--
 .../ReduceOnNeighborsWithExceptionITCase.java   |  43 ++--
 .../apache/flink/ml/util/FlinkTestBase.scala    |   2 +-
 .../src/test/resources/log4j-test.properties    |  38 ++++
 .../src/test/resources/logback-test.xml         |  42 ++++
 .../MesosApplicationMasterRunner.java           |  39 +++-
 .../MesosFlinkResourceManagerTest.java          |  37 +++-
 .../BackPressureStatsTrackerITCase.java         |  19 +-
 .../StackTraceSampleCoordinatorITCase.java      |  19 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  31 ++-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  47 ++--
 .../flink/runtime/client/JobClientActor.java    |   5 +-
 .../runtime/client/JobListeningContext.java     |  22 +-
 .../clusterframework/BootstrapTools.java        |  17 +-
 .../HighAvailabilityServices.java               |   7 +
 .../HighAvailabilityServicesUtils.java          |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   9 +
 .../ZooKeeperSubmittedJobGraphStore.java        |  12 +-
 .../ZooKeeperLeaderElectionService.java         | 184 ++++++++++------
 .../ZooKeeperLeaderRetrievalService.java        | 122 +++++++----
 .../minicluster/StandaloneMiniCluster.java      | 154 +++++++++++++
 .../runtime/query/QueryableStateClient.java     |  10 +-
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |  16 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  67 ++----
 .../flink/runtime/util/SerializedThrowable.java |   4 +
 .../flink/runtime/util/ZooKeeperUtils.java      |  27 ---
 .../flink/runtime/jobmanager/JobManager.scala   | 158 +++++++-------
 .../runtime/messages/TaskManagerMessages.scala  |  13 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 142 +++++++-----
 .../minicluster/LocalFlinkMiniCluster.scala     |  41 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |  71 +++---
 .../checkpoint/CoordinatorShutdownTest.java     |   3 +-
 .../runtime/client/JobClientActorTest.java      |   8 +-
 .../clusterframework/ClusterShutdownITCase.java | 201 ++++++++++-------
 .../clusterframework/ResourceManagerITCase.java | 178 +++++++++------
 .../clusterframework/ResourceManagerTest.java   | 128 ++++++++++-
 .../highavailability/ManualLeaderService.java   | 116 ++++++++++
 .../TestingManualHighAvailabilityServices.java  | 150 +++++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  35 +--
 .../JobManagerProcessReapingTest.java           |   3 +-
 .../jobmanager/JobManagerStartupTest.java       |   5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 188 ++++++++++------
 .../flink/runtime/jobmanager/JobSubmitTest.java |  15 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |   3 +-
 .../LeaderChangeJobRecoveryTest.java            |  32 ++-
 .../LeaderChangeStateCleanupTest.java           |  50 +++--
 .../LeaderElectionRetrievalTestingCluster.java  | 121 -----------
 .../TestingLeaderElectionService.java           |   4 +
 .../TestingLeaderRetrievalService.java          |   4 +
 .../ZooKeeperLeaderElectionTest.java            | 109 +++++-----
 .../ZooKeeperLeaderRetrievalTest.java           |  91 ++++----
 .../runtime/metrics/TaskManagerMetricsTest.java |  26 ++-
 ...askManagerComponentsStartupShutdownTest.java |  47 ++--
 .../TaskManagerConfigurationTest.java           |  76 ++++---
 .../TaskManagerProcessReapingTestBase.java      |  73 +++++--
 .../TaskManagerRegistrationTest.java            | 217 +++++++++++--------
 .../taskmanager/TaskManagerStartupTest.java     |  62 +++++-
 .../runtime/taskmanager/TaskManagerTest.java    | 136 ++++++++++--
 .../jobmanager/JobManagerRegistrationTest.scala |  88 +++++---
 .../runtime/testingUtils/TestingCluster.scala   |  37 +++-
 .../runtime/testingUtils/TestingUtils.scala     | 176 ++++++---------
 .../org/apache/flink/api/scala/FlinkShell.scala |  14 +-
 .../flink/api/scala/ScalaShellITCase.scala      | 104 ++++-----
 .../environment/RemoteStreamEnvironment.java    |   6 +-
 .../streaming/util/TestStreamEnvironment.java   |  79 ++++++-
 .../flink/test/util/JavaProgramTestBase.java    |   4 +-
 .../test/util/MultipleProgramsTestBase.java     |   2 +-
 .../apache/flink/test/util/TestEnvironment.java | 133 ++++++++++--
 .../accumulators/AccumulatorErrorITCase.java    |  49 ++---
 ...tractEventTimeWindowCheckpointingITCase.java |  21 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  21 +-
 .../StreamFaultToleranceTestBase.java           |   4 +-
 .../WindowCheckpointingITCase.java              |  18 +-
 .../test/classloading/ClassLoaderITCase.java    | 136 +++++++-----
 .../jar/CheckpointedStreamingProgram.java       |   9 +-
 .../jar/CheckpointingCustomKvStateProgram.java  |   9 +-
 .../jar/CustomInputSplitProgram.java            |  12 +-
 .../classloading/jar/CustomKvStateProgram.java  |  15 +-
 .../test/classloading/jar/KMeansForTest.java    |  17 +-
 .../jar/LegacyCheckpointedStreamingProgram.java |   6 +-
 .../jar/StreamingCustomInputSplitProgram.java   |  11 +-
 .../test/classloading/jar/StreamingProgram.java |   7 +-
 .../test/classloading/jar/UserCodeType.java     |   6 +-
 .../clients/examples/JobRetrievalITCase.java    |   7 +-
 .../CustomDistributionITCase.java               |   2 +-
 .../RemoteEnvironmentITCase.java                |  46 ++--
 .../flink/test/misc/AutoParallelismITCase.java  |   8 +-
 .../test/misc/CustomSerializationITCase.java    |  57 ++---
 .../test/misc/MiscellaneousIssuesITCase.java    |  52 ++---
 ...SuccessAfterNetworkBuffersFailureITCase.java |  28 +--
 .../query/AbstractQueryableStateITCase.java     |  33 ++-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  18 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  13 +-
 .../flink/test/recovery/FastFailuresITCase.java |  10 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |  42 +++-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  24 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |  28 ++-
 .../recovery/ProcessFailureCancelingITCase.java |  30 ++-
 .../TaskManagerFailureRecoveryITCase.java       |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |  17 +-
 .../AbstractOperatorRestoreTestBase.java        |  39 +++-
 .../test/streaming/runtime/TimestampITCase.java |  63 +++---
 .../jobmanager/JobManagerFailsITCase.scala      |  17 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |   2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   7 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  40 +++-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   2 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  38 +++-
 .../apache/flink/yarn/YarnClusterClient.java    |  36 +--
 .../apache/flink/yarn/YarnClusterClientV2.java  |   5 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   5 +-
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |  13 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   7 +-
 135 files changed, 3474 insertions(+), 2131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 0d61cbd..74d5f5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -263,7 +263,11 @@ public class CliFrontend {
 		}
 		finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown();
+				} catch (Exception e) {
+					LOG.warn("Could not properly shut down the cluster client.", e);
+				}
 			}
 			if (program != null) {
 				program.deleteExtractedLibraries();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index c58c74c..a4cb479 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -75,11 +75,11 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	 * @param config The Flink config to use
 	 * @param userJarFiles User jar files to include in the classpath of the cluster.
 	 * @return The client to communicate with the cluster which the CustomCommandLine brought up.
-	 * @throws UnsupportedOperationException if the operation is not supported
+	 * @throws Exception if the cluster could not be created
 	 */
 	ClusterType createCluster(
 			String applicationName,
 			CommandLine commandLine,
 			Configuration config,
-			List<URL> userJarFiles) throws UnsupportedOperationException;
+			List<URL> userJarFiles) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0f88f7c..6770eee 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -42,6 +42,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.client.JobRetrievalException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -72,7 +75,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
  */
@@ -95,6 +97,9 @@ public abstract class ClusterClient {
 	/** Lookup timeout for the job manager retrieval service */
 	private final FiniteDuration lookupTimeout;
 
+	/** Service factory for high available */
+	protected final HighAvailabilityServices highAvailabilityServices;
+
 	/** Flag indicating whether to sysout print execution updates */
 	private boolean printStatusDuringExecution = true;
 
@@ -119,10 +124,25 @@ public abstract class ClusterClient {
 	 *
 	 * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
 	 *
-	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 * @throws Exception we cannot create the high availability services
 	 */
-	public ClusterClient(Configuration flinkConfig) throws IOException {
+	public ClusterClient(Configuration flinkConfig) throws Exception {
+		this(flinkConfig,
+			HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				flinkConfig,
+				Executors.directExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+	}
 
+	/**
+	 * Creates a instance that submits the programs to the JobManager defined in the
+	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
+	 * if that is not possible.
+	 *
+	 * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
+	 * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
+	 */
+	public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
 
@@ -130,6 +150,8 @@ public abstract class ClusterClient {
 		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
 		this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);
+
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 	}
 
 	// ------------------------------------------------------------------------
@@ -202,12 +224,16 @@ public abstract class ClusterClient {
 	/**
 	 * Shuts down the client. This stops the internal actor system and actors.
 	 */
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		synchronized (this) {
 			try {
 				finalizeCluster();
 			} finally {
-				this.actorSystemLoader.shutdown();
+				actorSystemLoader.shutdown();
+			}
+
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
 			}
 		}
 	}
@@ -241,7 +267,8 @@ public abstract class ClusterClient {
 		try {
 			LeaderConnectionInfo leaderConnectionInfo =
 				LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-					LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true), timeout);
+					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+					timeout);
 
 			return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
 		} catch (Exception e) {
@@ -411,17 +438,17 @@ public abstract class ClusterClient {
 
 		waitForClusterToBeReady();
 
-		final LeaderRetrievalService leaderRetrievalService;
-		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-		} catch (Exception e) {
-			throw new ProgramInvocationException("Could not create the leader retrieval service", e);
-		}
-
 		try {
 			logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
-			this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
-				leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
+			this.lastJobExecutionResult = JobClient.submitJobAndWait(
+				actorSystemLoader.get(),
+				flinkConfig,
+				highAvailabilityServices,
+				jobGraph,
+				timeout,
+				printStatusDuringExecution,
+				classLoader);
+
 			return this.lastJobExecutionResult;
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
@@ -462,13 +489,6 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
-		final LeaderRetrievalService leaderRetrievalService;
-		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-		} catch (Exception e) {
-			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
-		}
-
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -477,13 +497,13 @@ public abstract class ClusterClient {
 		}
 
 		final JobListeningContext listeningContext = JobClient.attachToRunningJob(
-				jobID,
-				jobManagerGateway,
-				flinkConfig,
-				actorSystemLoader.get(),
-				leaderRetrievalService,
-				timeout,
-				printStatusDuringExecution);
+			jobID,
+			jobManagerGateway,
+			flinkConfig,
+			actorSystemLoader.get(),
+			highAvailabilityServices,
+			timeout,
+			printStatusDuringExecution);
 
 		return JobClient.awaitJobResult(listeningContext);
 	}
@@ -496,13 +516,6 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
-		final LeaderRetrievalService leaderRetrievalService;
-		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-		} catch (Exception e) {
-			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
-		}
-
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -515,7 +528,7 @@ public abstract class ClusterClient {
 				jobManagerGateway,
 				flinkConfig,
 				actorSystemLoader.get(),
-				leaderRetrievalService,
+				highAvailabilityServices,
 				timeout,
 				printStatusDuringExecution);
 	}
@@ -721,7 +734,7 @@ public abstract class ClusterClient {
 	public ActorGateway getJobManagerGateway() throws Exception {
 		LOG.debug("Looking up JobManager");
 		return LeaderRetrievalUtils.retrieveLeaderGateway(
-			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true),
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 			actorSystemLoader.get(),
 			lookupTimeout);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc9..fd179c0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -22,12 +22,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
@@ -38,10 +38,14 @@ import java.util.List;
  */
 public class StandaloneClusterClient extends ClusterClient {
 
-	public StandaloneClusterClient(Configuration config) throws IOException {
+	public StandaloneClusterClient(Configuration config) throws Exception {
 		super(config);
 	}
 
+	public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices) {
+		super(config, highAvailabilityServices);
+	}
+
 	@Override
 	public void waitForClusterToBeReady() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index d8fb3de..be93949 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
@@ -33,7 +32,6 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
@@ -48,26 +46,20 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 	}
 
 	@Test
-	public void testUnresolvableHostname1() {
+	public void testUnresolvableHostname1() throws Exception {
 
 		RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 		try {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (ProgramInvocationException e) {
+		catch (UnknownHostException ignored) {
 			// that is what we want!
-			assertTrue(e.getCause() instanceof UnknownHostException);
-		}
-		catch (Exception e) {
-			System.err.println("Wrong exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
 		}
 	}
 
 	@Test
-	public void testUnresolvableHostname2() {
+	public void testUnresolvableHostname2() throws Exception {
 
 		InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
 		RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
@@ -76,14 +68,8 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (ProgramInvocationException e) {
+		catch (UnknownHostException ignored) {
 			// that is what we want!
-			assertTrue(e.getCause() instanceof UnknownHostException);
-		}
-		catch (Exception e) {
-			System.err.println("Wrong exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index da297d6..b7ade2a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -56,7 +56,6 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
@@ -199,7 +198,7 @@ public class ClientTest extends TestLogger {
 	 * This test verifies correct job submission messaging logic and plan translation calls.
 	 */
 	@Test
-	public void shouldSubmitToJobClient() throws IOException, ProgramInvocationException {
+	public void shouldSubmitToJobClient() throws Exception {
 		jobManagerSystem.actorOf(
 			Props.create(SuccessReturningActor.class),
 			JobMaster.JOB_MANAGER_NAME);
@@ -217,7 +216,7 @@ public class ClientTest extends TestLogger {
 	 * This test verifies correct that the correct exception is thrown when the job submission fails.
 	 */
 	@Test
-	public void shouldSubmitToJobClientFails() throws IOException {
+	public void shouldSubmitToJobClientFails() throws Exception {
 			jobManagerSystem.actorOf(
 				Props.create(FailureReturningActor.class),
 				JobMaster.JOB_MANAGER_NAME);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index fc10f65..0ecdc2c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.util.StandaloneUtils;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -32,7 +34,7 @@ import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
 
 /**
- * Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
+ * Tests that verify that the LeaderRetrievalService correctly handles non-resolvable host names
  * and does not fail with another exception
  */
 public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
@@ -48,21 +50,16 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 	 * Tests that the StandaloneLeaderRetrievalService resolves host names if specified.
 	 */
 	@Test
-	public void testUnresolvableHostname1() {
+	public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
+		Configuration config = new Configuration();
 
-		try {
-			Configuration config = new Configuration();
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
-
-			LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
-		}
-		catch (Exception e) {
-			System.err.println("Shouldn't throw an exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		StandaloneUtils.createLeaderRetrievalService(
+			config,
+			false,
+			JobMaster.JOB_MANAGER_NAME);
 	}
 
 	/*
@@ -77,7 +74,10 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-			LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
+			StandaloneUtils.createLeaderRetrievalService(
+				config,
+				true,
+				JobMaster.JOB_MANAGER_NAME);
 			fail("This should fail with an UnknownHostException");
 		}
 		catch (UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 5f5209a..063a363 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,14 +19,16 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
+import java.net.URL;
+import java.util.Collections;
 
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,8 +45,9 @@ public class AvroExternalJarProgramITCase extends TestLogger {
 		LocalFlinkMiniCluster testMiniCluster = null;
 
 		try {
+			int parallelism = 4;
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 			testMiniCluster = new LocalFlinkMiniCluster(config, false);
 			testMiniCluster.start();
 
@@ -53,15 +56,16 @@ public class AvroExternalJarProgramITCase extends TestLogger {
 
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
 
+			TestEnvironment.setAsContext(
+				testMiniCluster,
+				parallelism,
+				Collections.singleton(new Path(jarFile)),
+				Collections.<URL>emptyList());
 
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
 
-			ClusterClient client = new StandaloneClusterClient(config);
-
-			client.setPrintStatusDuringExecution(false);
-			client.run(program, 4);
-
+			program.invokeInteractiveModeForExecution();
 		}
 		catch (Throwable t) {
 			System.err.println(t.getMessage());
@@ -69,6 +73,8 @@ public class AvroExternalJarProgramITCase extends TestLogger {
 			Assert.fail("Error during the packaged program execution: " + t.getMessage());
 		}
 		finally {
+			TestEnvironment.unsetAsContext();
+
 			if (testMiniCluster != null) {
 				try {
 					testMiniCluster.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 2085169..39b2b8f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -180,13 +180,14 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 		// ---------- Produce an event time stream into Kafka -------------------
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+			private static final long serialVersionUID = -2255105836471289626L;
 			boolean running = true;
 
 			@Override
@@ -208,6 +209,8 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
 		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+			private static final long serialVersionUID = -6730989584364230617L;
+
 			@Override
 			public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
 				return (int)(next % 3);
@@ -219,7 +222,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 		// ---------- Consume stream from Kafka -------------------
 
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
@@ -227,6 +230,8 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 		FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
 		kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+			private static final long serialVersionUID = -4834111073247835189L;
+
 			@Nullable
 			@Override
 			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
@@ -253,8 +258,12 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 	private static class TimestampValidatingOperator extends StreamSink<Long> {
 
+		private static final long serialVersionUID = 1353168781235526806L;
+
 		public TimestampValidatingOperator() {
 			super(new SinkFunction<Long>() {
+				private static final long serialVersionUID = -6676565693361786524L;
+
 				@Override
 				public void invoke(Long value) throws Exception {
 					throw new RuntimeException("Unexpected");
@@ -304,6 +313,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 	private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
 
+		private static final long serialVersionUID = 6966177118923713521L;
 		private final TypeInformation<Long> ti;
 		private final TypeSerializer<Long> ser;
 		long cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index f5cb8c0..8cc735d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -87,7 +87,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		final int valuesCount = 20;
 		final int startFrom = 0;
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
 		readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom);
@@ -190,7 +190,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		// write a sequence from 0 to 99 to each of the 3 partitions.
 		final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		// NOTE: We are not enabling the checkpointing!
 		env.getConfig().disableSysoutLogging();
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index e748537..16a13c0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -34,7 +34,7 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
 	protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
 
 	@BeforeClass
-	public static void prepare() throws IOException, ClassNotFoundException {
+	public static void prepare() throws ClassNotFoundException {
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting Kafka09SecuredRunITCase ");
 		LOG.info("-------------------------------------------------------------------------");

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ddac61c..ba83460 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -154,7 +154,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		try {
 			Properties properties = new Properties();
 
-			StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
 			see.getConfig().disableSysoutLogging();
 			see.setRestartStrategy(RestartStrategies.noRestart());
 			see.setParallelism(1);
@@ -173,22 +173,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			DataStream<String> stream = see.addSource(source);
 			stream.print();
 			see.execute("No broker test");
-		} catch(ProgramInvocationException pie) {
+		} catch(JobExecutionException jee) {
 			if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
-				assertTrue(pie.getCause() instanceof JobExecutionException);
-
-				JobExecutionException jee = (JobExecutionException) pie.getCause();
-
 				assertTrue(jee.getCause() instanceof TimeoutException);
 
 				TimeoutException te = (TimeoutException) jee.getCause();
 
 				assertEquals("Timeout expired while fetching topic metadata", te.getMessage());
 			} else {
-				assertTrue(pie.getCause() instanceof JobExecutionException);
-
-				JobExecutionException jee = (JobExecutionException) pie.getCause();
-
 				assertTrue(jee.getCause() instanceof RuntimeException);
 
 				RuntimeException re = (RuntimeException) jee.getCause();
@@ -208,7 +200,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1);
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.setParallelism(parallelism);
@@ -280,6 +272,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	public void runStartFromKafkaCommitOffsets() throws Exception {
 		final int parallelism = 3;
 		final int recordsInEachPartition = 300;
+		final int recordsToConsume = 150;
+		final int consumePause = 50;
 
 		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);
 
@@ -294,7 +288,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			attempt++;
 			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
 
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.getConfig().disableSysoutLogging();
 			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 			env.setParallelism(parallelism);
@@ -302,13 +296,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 			env
 				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
-				.map(new ThrottledMapper<String>(50))
+				.map(new ThrottledMapper<String>(consumePause))
 				.map(new MapFunction<String, Object>() {
 					int count = 0;
 					@Override
 					public Object map(String value) throws Exception {
 						count++;
-						if (count == 150) {
+						if (count == recordsToConsume) {
 							throw new SuccessException();
 						}
 						return null;
@@ -329,7 +323,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
 
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
 		env2.getConfig().disableSysoutLogging();
 		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env2.setParallelism(parallelism);
@@ -375,7 +369,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1);
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.setParallelism(parallelism);
@@ -452,7 +446,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1);
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.setParallelism(parallelism);
 
@@ -510,7 +504,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
 
 		// setup and run the latest-consuming job
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.setParallelism(parallelism);
 
@@ -541,7 +535,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				try {
 					env.execute(consumeExtraRecordsJobName);
 				} catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						error.set(t);
 					}
 				}
@@ -555,7 +549,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			consumeExtraRecordsJobName);
 
 		// setup the extra records writing job
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env2.setParallelism(parallelism);
 
 		DataStream<Tuple2<Integer, Integer>> extraRecordsStream = env2
 			.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
@@ -577,7 +573,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				public void cancel() {
 					running = false;
 				}
-			}).setParallelism(parallelism);
+			});
 
 		kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null);
 
@@ -626,7 +622,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final String topicName = writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, parallelism, 1);
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.setParallelism(parallelism);
 
@@ -685,7 +681,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1);
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.setParallelism(parallelism);
 
@@ -751,7 +747,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time
 
 		final StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+				StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.enableCheckpointing(500);
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
@@ -878,7 +874,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, parallelism, 1);
 
 		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
 				topic, parallelism, numElementsPerPartition, true);
 
@@ -887,7 +883,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DeserializationSchema<Integer> schema =
 				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
@@ -927,7 +923,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, numPartitions, 1);
 
 		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
 				topic, numPartitions, numElementsPerPartition, false);
 
@@ -936,7 +932,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DeserializationSchema<Integer> schema =
 				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
@@ -975,7 +971,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, numPartitions, 1);
 
 		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
 				topic, numPartitions, numElementsPerPartition, true);
 
@@ -984,7 +980,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DeserializationSchema<Integer> schema =
 				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
 		// set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions.
@@ -1033,7 +1029,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 					env.setParallelism(parallelism);
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
@@ -1107,7 +1103,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			@Override
 			public void run() {
 				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 					env.setParallelism(parallelism);
 					env.enableCheckpointing(100);
 					env.getConfig().disableSysoutLogging();
@@ -1163,7 +1159,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DeserializationSchema<Integer> schema =
 				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(12); // needs to be more that the mini cluster has slots
 		env.getConfig().disableSysoutLogging();
 
@@ -1180,7 +1176,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			env.execute("test fail on deploy");
 			fail("this test should fail with an exception");
 		}
-		catch (ProgramInvocationException e) {
+		catch (JobExecutionException e) {
 
 			// validate that we failed due to a NoResourceAvailableException
 			Throwable cause = e.getCause();
@@ -1209,7 +1205,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final int NUM_TOPICS = 5;
 		final int NUM_ELEMENTS = 20;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		
 		// create topics with content
@@ -1220,6 +1216,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			// create topic
 			createTestTopic(topic, i + 1 /*partitions*/, 1);
 		}
+
+		// before FLINK-6078 the RemoteExecutionEnvironment set the parallelism to 1 as well
+		env.setParallelism(1);
+
 		// run first job, producing into all topics
 		DataStream<Tuple3<Integer, Integer, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {
 
@@ -1249,7 +1249,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.execute("Write to topics");
 
 		// run second job consuming from multiple topics
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
 		stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
@@ -1357,7 +1357,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
 				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(100);
@@ -1457,7 +1457,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		createTestTopic(topic, parallelism, 2);
 
 		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
 				topic, parallelism, numElementsPerPartition, true);
 
@@ -1472,7 +1472,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DeserializationSchema<Integer> schema =
 				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.enableCheckpointing(500);
 		env.setRestartStrategy(RestartStrategies.noRestart());
@@ -1503,7 +1503,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// ----------- Write some data into Kafka -------------------
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
@@ -1535,7 +1535,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// ----------- Read the data again -------------------
 
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
@@ -1590,7 +1590,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// ----------- Write some data into Kafka -------------------
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
@@ -1621,7 +1621,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// ----------- Read the data again -------------------
 
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
@@ -1661,7 +1661,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
 
 		// read using custom schema
-		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
 		env1.setParallelism(1);
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env1.getConfig().disableSysoutLogging();
@@ -1700,7 +1700,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			public void run() {
 				try {
 					// start job writing & reading data.
-					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
 					env1.setParallelism(1);
 					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 					env1.getConfig().disableSysoutLogging();
@@ -1741,7 +1741,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env1.execute("Metrics test job");
 				} catch(Throwable t) {
 					LOG.warn("Got exception during execution", t);
-					if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job
+					if(!(t instanceof JobCancellationException)) { // we'll cancel the job
 						error.f0 = t;
 					}
 				}
@@ -1994,7 +1994,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 			createTestTopic(topicName, parallelism, replicationFactor);
 
-			StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 			writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 			writeEnv.getConfig().disableSysoutLogging();
 			
@@ -2046,7 +2046,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 			
-			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 			readEnv.getConfig().disableSysoutLogging();
 			readEnv.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index c925c8f..6f61392 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -71,7 +71,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
 
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setRestartStrategy(RestartStrategies.noRestart());
 			env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 954dc7d..f688660 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -59,6 +60,12 @@ import static org.junit.Assert.fail;
 public class KafkaShortRetentionTestBase implements Serializable {
 	
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+
+	protected static final int NUM_TMS = 1;
+
+	protected static final int TM_SLOTS = 8;
+
+	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
 	
 	private static KafkaTestEnvironment kafkaServer;
 	private static Properties standardProps;
@@ -97,17 +104,21 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		standardProps = kafkaServer.getStandardProperties();
 
 		// start also a re-usable Flink mini cluster
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
 		flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
+
+		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
 	public static void shutDownServices() {
+		TestStreamEnvironment.unsetAsContext();
+
 		if (flink != null) {
 			flink.shutdown();
 		}
@@ -135,8 +146,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		tprops.setProperty("retention.ms", "250");
 		kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
 
-		final StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
 		env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index a21a239..1837af6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -40,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -65,14 +65,18 @@ public abstract class KafkaTestBase extends TestLogger {
 	
 	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
 
+	protected static final int NUM_TMS = 1;
+
+	protected static final int TM_SLOTS = 8;
+
+	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
+
 	protected static String brokerConnectionStrings;
 
 	protected static Properties standardProps;
 	
 	protected static LocalFlinkMiniCluster flink;
 
-	protected static int flinkPort;
-
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
 	protected static KafkaTestEnvironment kafkaServer;
@@ -87,7 +91,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	// ------------------------------------------------------------------------
 	
 	@BeforeClass
-	public static void prepare() throws IOException, ClassNotFoundException {
+	public static void prepare() throws ClassNotFoundException {
 
 		LOG.info("-------------------------------------------------------------------------");
 		LOG.info("    Starting KafkaTestBase ");
@@ -95,6 +99,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
 		startClusters(false);
 
+		TestStreamEnvironment.setAsContext(flink, PARALLELISM);
 	}
 
 	@AfterClass
@@ -104,6 +109,8 @@ public abstract class KafkaTestBase extends TestLogger {
 		LOG.info("    Shut down KafkaTestBase ");
 		LOG.info("-------------------------------------------------------------------------");
 
+		TestStreamEnvironment.unsetAsContext();
+
 		shutdownClusters();
 
 		LOG.info("-------------------------------------------------------------------------");
@@ -113,8 +120,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static Configuration getFlinkConfiguration() {
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
@@ -147,14 +154,10 @@ public abstract class KafkaTestBase extends TestLogger {
 		// start also a re-usable Flink mini cluster
 		flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
 		flink.start();
-
-		flinkPort = flink.getLeaderRPCPort();
-
 	}
 
 	protected static void shutdownClusters() {
 
-		flinkPort = -1;
 		if (flink != null) {
 			flink.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
index 6bdfb48..16c226f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 2b7f357..21794f9 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -205,7 +205,7 @@ public class FlinkClient {
 		final ClusterClient client;
 		try {
 			client = new StandaloneClusterClient(configuration);
-		} catch (final IOException e) {
+		} catch (final Exception e) {
 			throw new RuntimeException("Could not establish a connection to the job manager", e);
 		}
 
@@ -245,7 +245,7 @@ public class FlinkClient {
 		final ClusterClient client;
 		try {
 			client = new StandaloneClusterClient(configuration);
-		} catch (final IOException e) {
+		} catch (final Exception e) {
 			throw new RuntimeException("Could not establish a connection to the job manager", e);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 7a25636..f9b6a21 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -22,8 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -41,8 +42,9 @@ public class CollectITCase extends TestLogger {
 		try {
 			cluster.start();
 
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
+			TestStreamEnvironment.setAsContext(cluster, 1);
+
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 	
 			final long N = 10;
 			DataStream<Long> stream = env.generateSequence(1, N);
@@ -57,6 +59,7 @@ public class CollectITCase extends TestLogger {
 			assertEquals("received wrong number of elements", N + 1, i);
 		}
 		finally {
+			TestStreamEnvironment.unsetAsContext();
 			cluster.stop();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index ad434d4..111d421 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -45,27 +46,19 @@ public class DegreesWithExceptionITCase extends TestLogger {
 
 	@BeforeClass
 	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
 	}
 
 	@AfterClass
 	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
-		}
+		cluster.stop();
+
+		TestEnvironment.unsetAsContext();
 	}
 
 	/**
@@ -74,8 +67,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testOutDegreesInvalidEdgeSrcId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 		
@@ -98,8 +90,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testInDegreesInvalidEdgeTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -122,8 +113,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGetDegreesInvalidEdgeTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -146,8 +136,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGetDegreesInvalidEdgeSrcId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -170,8 +159,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index d090d3c..7a0a30c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -48,27 +49,19 @@ public class ReduceOnEdgesWithExceptionITCase extends TestLogger {
 
 	@BeforeClass
 	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
 	}
 
 	@AfterClass
 	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
-		}
+		cluster.stop();
+
+		TestEnvironment.unsetAsContext();
 	}
 
 	/**
@@ -77,8 +70,7 @@ public class ReduceOnEdgesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnEdgesInvalidEdgeSrcId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -91,6 +83,8 @@ public class ReduceOnEdgesWithExceptionITCase extends TestLogger {
 
 			verticesWithAllNeighbors.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			env.execute();
+
+			fail("Expected an exception.");
 		} catch (Exception e) {
 			// We expect the job to fail with an exception
 		}
@@ -102,8 +96,7 @@ public class ReduceOnEdgesWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnEdgesInvalidEdgeTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -116,6 +109,8 @@ public class ReduceOnEdgesWithExceptionITCase extends TestLogger {
 
 			verticesWithAllNeighbors.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			env.execute();
+
+			fail("Expected an exception.");
 		} catch (Exception e) {
 			// We expect the job to fail with an exception
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index afe2e18..b337bca 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -49,27 +50,19 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 
 	@BeforeClass
 	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
 	}
 
 	@AfterClass
 	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
-		}
+		cluster.stop();
+
+		TestEnvironment.unsetAsContext();
 	}
 
 	/**
@@ -79,8 +72,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -93,6 +85,8 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 
 			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			env.execute();
+
+			fail("Expected an exception.");
 		} catch (Exception e) {
 			// We expect the job to fail with an exception
 		}
@@ -105,8 +99,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -119,6 +112,8 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 
 			verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			env.execute();
+
+			fail("Expected an exception.");
 		} catch (Exception e) {
 			// We expect the job to fail with an exception
 		}
@@ -131,8 +126,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -157,8 +151,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends TestLogger {
 	@Test
 	public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception {
 
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 6353d6a..3ee7a99 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -62,7 +62,7 @@ trait FlinkTestBase extends BeforeAndAfter {
       false,
       true)
 
-    val clusterEnvironment = new TestEnvironment(cl, parallelism)
+    val clusterEnvironment = new TestEnvironment(cl, parallelism, false)
     clusterEnvironment.setAsContext()
 
     cluster = Some(cl)