You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/26 16:23:21 UTC

[4/4] flink git commit: [tests] Adjust tests for dedicated streaming mode and clean up test bases.

[tests] Adjust tests for dedicated streaming mode and clean up test bases.

This closes #718


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04377224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04377224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04377224

Branch: refs/heads/master
Commit: 0437722449c59fdc21fc5d18a59e5e0b961208af
Parents: efec229
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:24:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:23 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../JobManagerProcessReapingTest.java           |  3 +-
 .../jobmanager/JobManagerStartupTest.java       | 11 ++-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  3 +-
 ...askManagerComponentsStartupShutdownTest.java |  4 +-
 .../TaskManagerProcessReapingTest.java          |  5 +-
 .../TaskManagerRegistrationTest.java            |  9 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 12 ++-
 .../taskmanager/TestManagerStartupTest.java     | 10 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  6 +-
 .../runtime/testingUtils/TestingCluster.scala   | 24 +++--
 .../runtime/testingUtils/TestingUtils.scala     | 28 +-----
 .../api/complex/ComplexIntegrationTest.java     |  9 +-
 .../util/StreamingMultipleProgramsTestBase.java | 53 ++++++-----
 .../util/StreamingProgramTestBase.java          |  2 +-
 .../scala/table/test/AggregationsITCase.scala   | 20 ++---
 .../flink/api/scala/table/test/AsITCase.scala   | 21 ++---
 .../api/scala/table/test/CastingITCase.scala    | 14 +--
 .../scala/table/test/ExpressionsITCase.scala    | 22 ++---
 .../api/scala/table/test/FilterITCase.scala     | 20 ++---
 .../table/test/GroupedAggreagationsITCase.scala | 14 +--
 .../flink/api/scala/table/test/JoinITCase.scala | 22 ++---
 .../api/scala/table/test/SelectITCase.scala     | 22 ++---
 .../table/test/StringExpressionsITCase.scala    | 16 ++--
 .../util/AbstractMultipleProgramsTestBase.java  | 84 ------------------
 .../flink/test/util/AbstractTestBase.java       | 14 +--
 .../test/util/MultipleProgramsTestBase.java     | 52 ++++++++++-
 .../apache/flink/test/util/TestBaseUtils.java   | 92 +++++++++++---------
 .../apache/flink/test/util/FlinkTestBase.scala  |  8 +-
 .../test/util/ForkableFlinkMiniCluster.scala    | 21 +++--
 .../AbstractProcessFailureRecoveryTest.java     |  5 +-
 .../recovery/ProcessFailureCancelingITCase.java |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  9 +-
 .../api/scala/actions/CountCollectITCase.scala  |  2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |  6 +-
 .../hadoop/mapred/WordCountMapredITCase.scala   |  5 +-
 .../mapreduce/WordCountMapreduceITCase.scala    |  5 +-
 .../scala/io/ScalaCsvReaderWithPOJOITCase.scala |  6 +-
 .../api/scala/operators/AggregateITCase.scala   |  6 +-
 .../api/scala/operators/CoGroupITCase.scala     |  6 +-
 .../flink/api/scala/operators/CrossITCase.scala |  6 +-
 .../api/scala/operators/DistinctITCase.scala    |  6 +-
 .../api/scala/operators/ExamplesITCase.scala    | 13 ++-
 .../api/scala/operators/FilterITCase.scala      | 10 +--
 .../api/scala/operators/FirstNITCase.scala      |  6 +-
 .../api/scala/operators/FlatMapITCase.scala     |  6 +-
 .../scala/operators/GroupCombineITCase.scala    |  5 +-
 .../api/scala/operators/GroupReduceITCase.scala |  6 +-
 .../flink/api/scala/operators/JoinITCase.scala  |  6 +-
 .../flink/api/scala/operators/MapITCase.scala   |  6 +-
 .../api/scala/operators/PartitionITCase.scala   |  6 +-
 .../api/scala/operators/ReduceITCase.scala      |  6 +-
 .../api/scala/operators/SumMinMaxITCase.scala   |  2 +-
 .../flink/api/scala/operators/UnionITCase.scala |  6 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala | 22 ++---
 .../yarn/appMaster/YarnTaskManagerRunner.java   | 15 +++-
 .../apache/flink/yarn/ApplicationMaster.scala   | 21 +++--
 57 files changed, 421 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ba819ca..d5df633 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -945,7 +945,8 @@ object JobManager {
    * @param args command line arguments
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+  def parseArgs(args: Array[String]):
+                     (Configuration, JobManagerMode, StreamingMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
       head("Flink JobManager")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index a332ee1..be73bf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,6 +27,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -183,7 +184,7 @@ public class JobManagerProcessReapingTest {
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
-				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port);
+				JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.BATCH_ONLY, "localhost", port);
 				System.exit(0);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 070e376..e7665a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.net.NetUtils;
 
 import org.junit.After;
@@ -80,7 +81,8 @@ public class JobManagerStartupTest {
 		}
 		
 		try {
-			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
+			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER,
+									StreamingMode.BATCH_ONLY, "localhost", portNum);
 			fail("this should throw an exception");
 		}
 		catch (Exception e) {
@@ -94,7 +96,9 @@ public class JobManagerStartupTest {
 			try {
 				portOccupier.close();
 			}
-			catch (Throwable t) {}
+			catch (Throwable t) {
+				// ignore
+			}
 		}
 	}
 
@@ -117,7 +121,8 @@ public class JobManagerStartupTest {
 		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
 
 		try {
-			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
+			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER,
+										StreamingMode.BATCH_ONLY, "localhost", portNum);
 			fail("this should fail with an exception");
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index e69687f..7b6d688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -61,7 +62,7 @@ public class JobSubmitTest {
 
 		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
 		jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
-		jobManager = JobManager.startJobManagerActors(config, jobManagerSystem)._1();
+		jobManager = JobManager.startJobManagerActors(config, jobManagerSystem, StreamingMode.BATCH_ONLY)._1();
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7d4994d..a67cd00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -68,7 +69,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 		try {
 			actorSystem = AkkaUtils.createLocalActorSystem(config);
 
-			final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+			final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem, 
+																			StreamingMode.BATCH_ONLY)._1();
 
 			// create the components for the TaskManager manually
 			final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 91fadca..c55a721 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -84,7 +85,7 @@ public class TaskManagerProcessReapingTest {
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
-			JobManager.startJobManagerActors(new Configuration(), jmActorSystem);
+			JobManager.startJobManagerActors(new Configuration(), jmActorSystem, StreamingMode.BATCH_ONLY);
 
 			final int taskManagerPort = NetUtils.getAvailablePort();
 
@@ -198,7 +199,7 @@ public class TaskManagerProcessReapingTest {
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
 
-				TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
+				TaskManager.runTaskManager("localhost", taskManagerPort, cfg, StreamingMode.BATCH_ONLY);
 
 				// wait forever
 				Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 69964ea..3e5f2cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -152,7 +153,8 @@ public class TaskManagerRegistrationTest {
 				Thread.sleep(6000);
 
 				// now start the JobManager, with the regular akka URL
-				final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+				final ActorRef jobManager =
+						JobManager.startJobManagerActors(new Configuration(), actorSystem, StreamingMode.BATCH_ONLY)._1();
 
 				// check that the TaskManagers are registered
 				Future<Object> responseFuture = Patterns.ask(
@@ -371,6 +373,7 @@ public class TaskManagerRegistrationTest {
 							NONE_STRING, // no actor name -> random
 							new Some<String>(jobManager.path().toString()), // job manager path
 							false, // init network stack !!!
+							StreamingMode.BATCH_ONLY,
 							TaskManager.class);
 
 					watch(taskManager);
@@ -415,7 +418,8 @@ public class TaskManagerRegistrationTest {
 	private static ActorRef startJobManager() throws Exception {
 		// start the actors. don't give names, so they get generated names and we
 		// avoid conflicts with the actor names
-		return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+		return JobManager.startJobManagerActors(new Configuration(), actorSystem, 
+												NONE_STRING, NONE_STRING, StreamingMode.BATCH_ONLY)._1();
 	}
 
 	private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
@@ -430,6 +434,7 @@ public class TaskManagerRegistrationTest {
 				NONE_STRING, // no actor name -> random
 				new Some<String>(jobManagerUrl), // job manager path
 				true, // local network stack only
+				StreamingMode.BATCH_ONLY,
 				TaskManager.class);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a308c81..d33dcd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -27,9 +27,11 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -59,11 +61,14 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -903,7 +908,8 @@ public class TaskManagerTest {
 		return createTaskManager(jobManager, waitForRegistration, true, NetUtils.getAvailablePort());
 	}
 
-	public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, boolean useLocalCommunication, int dataPort) {
+	public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, 
+												boolean useLocalCommunication, int dataPort) {
 		ActorRef taskManager = null;
 		try {
 			Configuration cfg = new Configuration();
@@ -916,7 +922,9 @@ public class TaskManagerTest {
 					cfg, system, "localhost",
 					Option.<String>empty(),
 					jobMangerUrl,
-					useLocalCommunication, TestingTaskManager.class);
+					useLocalCommunication,
+					StreamingMode.BATCH_ONLY,
+					TestingTaskManager.class);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
index 2b945e1..a033eb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.StreamingMode;
 import org.junit.Test;
 
 import java.io.File;
@@ -53,7 +54,8 @@ public class TestManagerStartupTest {
 			final int port = blocker.getLocalPort();
 
 			try {
-				TaskManager.runTaskManager(localHostName, port, new Configuration(), TaskManager.class);
+				TaskManager.runTaskManager(localHostName, port, new Configuration(),
+											StreamingMode.BATCH_ONLY, TaskManager.class);
 				fail("This should fail with an IOException");
 			}
 			catch (IOException e) {
@@ -101,7 +103,7 @@ public class TestManagerStartupTest {
 			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IOException e) {
@@ -138,7 +140,7 @@ public class TestManagerStartupTest {
 			// something invalid
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -150,7 +152,7 @@ public class TestManagerStartupTest {
 									ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
 			cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 5fde5ea..5ae6b5b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -23,6 +23,7 @@ import java.net.InetAddress
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
@@ -122,8 +123,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   private def startTestingJobManager(system: ActorSystem): ActorRef = {
-    val (jm: ActorRef, _) = JobManager.startJobManagerActors(
-                                        new Configuration(), _system, None, None)
+    val (jm: ActorRef, _) = JobManager.startJobManagerActors(new Configuration(), _system,
+                                                             None, None,
+                                                             StreamingMode.BATCH_ONLY)
     jm
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9f9fe93..a904f60 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, Props, ActorSystem}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
@@ -33,9 +34,19 @@ import org.apache.flink.runtime.taskmanager.TaskManager
  * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
  *                          otherwise false
  */
-class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
+class TestingCluster(userConfiguration: Configuration,
+                     singleActorSystem: Boolean,
+                     streamingMode: StreamingMode)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+  
 
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
+        = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+  
+  // --------------------------------------------------------------------------
+  
   override def generateConfiguration(userConfig: Configuration): Configuration = {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -52,13 +63,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
     val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
     executionRetries, delayBetweenRetries,
     timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
-
+    
     val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+    
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
       libraryCacheManager, archive, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout) with TestingJobManager)
+      delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
 
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
@@ -72,12 +83,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
     } else {
       None
     }
-
+    
     TaskManager.startTaskManagerComponentsAndActor(configuration, system,
                                                    HOSTNAME,
                                                    Some(tmActorName),
                                                    jobManagerPath,
                                                    numTaskManagers == 1,
+                                                   streamingMode,
                                                    classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 63dce31..3611633 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
+
 import com.typesafe.config.ConfigFactory
+
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.taskmanager.TaskManager
-import scala.concurrent.duration._
 
+import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext
 import scala.language.postfixOps
 
@@ -55,28 +56,7 @@ object TestingUtils {
   }
 
   def getDefaultTestingActorSystemConfig = testConfig
-
-
-  def startTestingTaskManagerWithConfiguration(hostname: String,
-                                               jobManagerURL: String,
-                                               config: Configuration,
-                                               system: ActorSystem) : ActorRef = {
-
-
-    TaskManager.startTaskManagerComponentsAndActor(config, system,
-                                                   hostname,
-                                                   None, // random actor name
-                                                   Some(jobManagerURL), // job manager
-                                                   true, classOf[TestingTaskManager])
-  }
-
-  def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
-
-    val jmURL = jobManager.path.toString
-    val config = new Configuration()
-
-    startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
-  }
+  
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 738654a..6fdd839 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -52,8 +52,6 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
@@ -65,7 +63,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
 public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 	// *************************************************************************
@@ -76,10 +74,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 	private String resultPath2;
 	private String expected1;
 	private String expected2;
-
-	public ComplexIntegrationTest(TestExecutionMode mode) {
-		super(mode);
-	}
+	
 
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 36e62f9..945ac07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 /**
  * Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -53,25 +53,34 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 
-	public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
-		super(mode);
-		switch(this.mode){
-			case CLUSTER:
-				TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
-				clusterEnv.setAsContext();
-				break;
-			case COLLECTION:
-				throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
-		}
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+	
+	// ------------------------------------------------------------------------
+	
+	public StreamingMultipleProgramsTestBase() {
+		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+		clusterEnv.setAsContext();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Cluster setup & teardown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
 	}
 
-	@Parameterized.Parameters(name = "Execution mode = {0}")
-	public static Collection<TestExecutionMode[]> executionModes() {
-		TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
-		ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
-		temsList.add(tems);
-		return temsList;
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 0446b61..23be327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 public abstract class StreamingProgramTestBase extends AbstractTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 38be85e..3b7ab8d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAggregationTypes: Unit = {
+  def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -62,7 +62,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAggregationOnNonExistingField: Unit = {
+  def testAggregationOnNonExistingField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -74,7 +74,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test
-  def testWorkingAggregationDataTypes: Unit = {
+  def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(
@@ -88,7 +88,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test
-  def testAggregationWithArithmetic: Unit = {
+  def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
@@ -100,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingAggregationDataTypes: Unit = {
+  def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
@@ -112,7 +112,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNoNestedAggregations: Unit = {
+  def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 3a0cc69..c6259ec 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -32,6 +32,7 @@ import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
 class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  
   private var resultPath: String = null
   private var expected: String = ""
   private val _tempFolder = new TemporaryFolder()
@@ -45,12 +46,12 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAs: Unit = {
+  def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -66,7 +67,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
+  def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -77,7 +78,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
+  def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -88,7 +89,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
+  def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -99,7 +100,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference1: Unit = {
+  def testAsWithNonFieldReference1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
@@ -111,7 +112,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference2: Unit = {
+  def testAsWithNonFieldReference2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 9557985..736cf68 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -43,12 +43,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAutoCastToString: Unit = {
+  def testAutoCastToString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
@@ -60,7 +60,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testNumericAutoCastInArithmetic: Unit = {
+  def testNumericAutoCastInArithmetic(): Unit = {
 
     // don't test everything, just some common cast directions
 
@@ -74,7 +74,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testNumericAutoCastInComparison: Unit = {
+  def testNumericAutoCastInComparison(): Unit = {
 
     // don't test everything, just some common cast directions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8c60acf..d9de287 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testArithmetic: Unit = {
+  def testArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 10)).as('a, 'b)
@@ -61,7 +61,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testLogic: Unit = {
+  def testLogic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, true)).as('a, 'b)
@@ -73,7 +73,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testComparisons: Unit = {
+  def testComparisons(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
@@ -85,7 +85,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testBitwiseOperations: Unit = {
+  def testBitwiseOperations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -98,7 +98,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testBitwiseWithAutocast: Unit = {
+  def testBitwiseWithAutocast(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -111,7 +111,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testBitwiseWithNonWorkingAutocast: Unit = {
+  def testBitwiseWithNonWorkingAutocast(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -124,7 +124,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testCaseInsensitiveForAs: Unit = {
+  def testCaseInsensitiveForAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 982a302..bc51a7e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -47,12 +47,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAllRejectingFilter: Unit = {
+  def testAllRejectingFilter(): Unit = {
     /*
      * Test all-rejecting filter.
      */
@@ -67,7 +67,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testAllPassingFilter: Unit = {
+  def testAllPassingFilter(): Unit = {
     /*
      * Test all-passing filter.
      */
@@ -87,7 +87,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testFilterOnStringTupleField: Unit = {
+  def testFilterOnStringTupleField(): Unit = {
     /*
      * Test filter on String tuple field.
      */
@@ -100,7 +100,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testFilterOnIntegerTupleField: Unit = {
+  def testFilterOnIntegerTupleField(): Unit = {
     /*
      * Test filter on Integer tuple field.
      */
@@ -120,7 +120,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @Ignore
   @Test
-  def testFilterBasicType: Unit = {
+  def testFilterBasicType(): Unit = {
     /*
      * Test filter on basic type
      */
@@ -137,7 +137,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @Ignore
   @Test
-  def testFilterOnCustomType: Unit = {
+  def testFilterOnCustomType(): Unit = {
     /*
      * Test filter on custom type
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 1f29722..d76d75c 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testGroupingOnNonExistentField: Unit = {
+  def testGroupingOnNonExistentField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -63,7 +63,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @Test
-  def testGroupedAggregate: Unit = {
+  def testGroupedAggregate(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
     // if we don't want the key in the output
@@ -79,7 +79,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @Test
-  def testGroupingKeyForwardIfNotUsed: Unit = {
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
     // if we don't want the key in the output

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 17221d8..b3baa56 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testJoin: Unit = {
+  def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -63,7 +63,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithFilter: Unit = {
+  def testJoinWithFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -76,7 +76,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithMultipleKeys: Unit = {
+  def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -90,7 +90,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinNonExistingKey: Unit = {
+  def testJoinNonExistingKey(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -103,7 +103,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinWithNonMatchingKeyTypes: Unit = {
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -116,7 +116,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinWithAmbiguousFields: Unit = {
+  def testJoinWithAmbiguousFields(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
@@ -129,7 +129,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithAggregation: Unit = {
+  def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 6ba6c9f..1a13d93 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testSimpleSelectAll: Unit = {
+  def testSimpleSelectAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
@@ -66,7 +66,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testSimpleSelectAllWithAs: Unit = {
+  def testSimpleSelectAllWithAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
@@ -82,7 +82,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testSimpleSelectWithNaming: Unit = {
+  def testSimpleSelectWithNaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -97,7 +97,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
+  def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -108,7 +108,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
+  def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -119,7 +119,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
+  def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -131,7 +131,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
 
   @Test(expected = classOf[ExpressionException])
-  def testOnlyFieldRefInAs: Unit = {
+  def testOnlyFieldRefInAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 3f0f46f..65fe77a 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testSubstring: Unit = {
+  def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
       .select('a.substring(0, 'b))
@@ -60,7 +60,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test
-  def testSubstringWithMaxEnd: Unit = {
+  def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
       .select('a.substring('b))
@@ -71,7 +71,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring1: Unit = {
+  def testNonWorkingSubstring1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
@@ -83,7 +83,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring2: Unit = {
+  def testNonWorkingSubstring2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
deleted file mode 100644
index ef81dfe..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Abstract base class for unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the ExecutionEnvironment from
- * the context:
- *
- * <pre>{@code
- *
- *   @Test
- *   public void someTest() {
- *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   @Test
- *   public void anotherTest() {
- *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * }</pre>
- */
-public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
-
-	/**
-	 * Enum that defines which execution environment to run the next test on:
-	 * An embedded local flink cluster, or the collection execution backend.
-	 */
-	public enum TestExecutionMode {
-		CLUSTER,
-		COLLECTION
-	}
-
-	// -----------------------------------------------------------------------------------------...
-
-	private static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster = null;
-
-	protected transient TestExecutionMode mode;
-
-	public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
-		this.mode = mode;
-	}
-
-	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c8b79ef..a5a7da9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -36,15 +37,16 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 
 	protected final Configuration config;
 
-	protected ForkableFlinkMiniCluster executor;
-
 	private final List<File> tempFiles;
 
+	private final FiniteDuration timeout;
+	
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
 	protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
-
-	private final FiniteDuration timeout;
+	
+	protected ForkableFlinkMiniCluster executor;
+	
 
 	public AbstractTestBase(Configuration config) {
 		this.config = config;
@@ -57,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
 
-	public void startCluster() throws Exception{
-		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, false);
+	public void startCluster() throws Exception {
+		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
 	}
 
 	public void stopCluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index e0c4360..8dab485 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.StreamingMode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -51,11 +54,36 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class MultipleProgramsTestBase extends TestBaseUtils {
 
+	/**
+	 * Enum that defines which execution environment to run the next test on:
+	 * An embedded local flink cluster, or the collection execution backend.
+	 */
+	public enum TestExecutionMode {
+		CLUSTER,
+		COLLECTION
+	}
+	
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	protected static boolean startWebServer = false;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+	
+	// ------------------------------------------------------------------------
+	
+	protected final TestExecutionMode mode;
+
+	
 	public MultipleProgramsTestBase(TestExecutionMode mode){
-		super(mode);
-		switch(this.mode){
+		this.mode = mode;
+		
+		switch(mode){
 			case CLUSTER:
 				TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
 				clusterEnv.setAsContext();
@@ -67,6 +95,24 @@ public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Cluster setup & teardown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Parametrization lets the tests run in cluster and collection mode
+	// ------------------------------------------------------------------------
+	
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes(){
 		return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b0e4ae9..29cb2a4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
@@ -79,46 +80,55 @@ public class TestBaseUtils {
 
 	protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
 
-	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration
-			(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 
+	// ------------------------------------------------------------------------
+	
 	protected static File logDir;
 
 	protected TestBaseUtils(){
 		verifyJvmOptions();
 	}
-
-	private void verifyJvmOptions() {
+	
+	private static void verifyJvmOptions() {
 		long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
-
-	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
-			taskManagerNumSlots, boolean startWebserver) throws Exception {
+	
+	
+	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+															int taskManagerNumSlots,
+															StreamingMode mode,
+															boolean startWebserver) throws Exception {
+		
 		logDir = File.createTempFile("TestBaseUtils-logdir", null);
 		Assert.assertTrue("Unable to delete temp file", logDir.delete());
 		Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
+	
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
-		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
+
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+		
 		config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, startWebserver);
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
-		return new ForkableFlinkMiniCluster(config);
+		
+		return new ForkableFlinkMiniCluster(config, true, mode);
 	}
 
-	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout)
-			throws Exception {
-		if(logDir != null) {
-			logDir.delete();
+	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+		if (logDir != null) {
+			FileUtils.deleteDirectory(logDir);
 		}
-		if(executor != null) {
+		if (executor != null) {
 			int numUnreleasedBCVars = 0;
 			int numActiveConnections = 0;
 			{
@@ -169,11 +179,11 @@ public class TestBaseUtils {
 	//  Result Checking
 	// --------------------------------------------------------------------------------------------
 
-	public BufferedReader[] getResultReader(String resultPath) throws IOException {
+	public static BufferedReader[] getResultReader(String resultPath) throws IOException {
 		return getResultReader(resultPath, new String[]{}, false);
 	}
 
-	public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
+	public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
 											boolean inOrderOfFiles) throws IOException {
 		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 
@@ -206,12 +216,11 @@ public class TestBaseUtils {
 
 
 
-	public BufferedInputStream[] getResultInputStream(String resultPath) throws
-			IOException {
+	public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
 		return getResultInputStream(resultPath, new String[]{});
 	}
 
-	public BufferedInputStream[] getResultInputStream(String resultPath, String[]
+	public static BufferedInputStream[] getResultInputStream(String resultPath, String[]
 			excludePrefixes) throws IOException {
 		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
@@ -221,37 +230,37 @@ public class TestBaseUtils {
 		return inStreams;
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath) throws
-			IOException {
+	public static void readAllResultLines(List<String> target, String resultPath) throws IOException {
 		readAllResultLines(target, resultPath, new String[]{});
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath, String[]
-			excludePrefixes) throws IOException {
+	public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) 
+			throws IOException {
+		
 		readAllResultLines(target, resultPath, excludePrefixes, false);
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath, String[]
+	public static void readAllResultLines(List<String> target, String resultPath, String[]
 			excludePrefixes, boolean inOrderOfFiles) throws IOException {
 		for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
-			String s = null;
+			String s;
 			while ((s = reader.readLine()) != null) {
 				target.add(s);
 			}
 		}
 	}
 
-	public void compareResultsByLinesInMemory(String expectedResultStr, String
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String
 			resultPath) throws Exception {
 		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
 	}
 
-	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
 											String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 		Arrays.sort(result);
 
 		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
@@ -261,18 +270,18 @@ public class TestBaseUtils {
 		Assert.assertArrayEquals(expected, result);
 	}
 
-	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
 																	String resultPath) throws
 			Exception {
 		compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
 	}
 
-	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
 																	String resultPath, String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, true);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 
 		String[] expected = expectedResultStr.split("\n");
 
@@ -280,7 +289,7 @@ public class TestBaseUtils {
 		Assert.assertArrayEquals(expected, result);
 	}
 
-	public void checkLinesAgainstRegexp(String resultPath, String regexp){
+	public static void checkLinesAgainstRegexp(String resultPath, String regexp){
 		Pattern pattern = Pattern.compile(regexp);
 		Matcher matcher = pattern.matcher("");
 
@@ -301,17 +310,17 @@ public class TestBaseUtils {
 
 	}
 
-	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
 											String delimiter, double maxDelta) throws Exception {
 		compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
 	}
 
-	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
 											String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 		String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
 
 		Assert.assertEquals("Wrong number of result lines.", expected.length, result.length);
@@ -330,7 +339,7 @@ public class TestBaseUtils {
 		}
 	}
 
-	public <X> void compareResultCollections(List<X> expected, List<X> actual,
+	public static <X> void compareResultCollections(List<X> expected, List<X> actual,
 											Comparator<X> comparator) {
 		Assert.assertEquals(expected.size(), actual.size());
 
@@ -445,8 +454,8 @@ public class TestBaseUtils {
 	protected static void deleteRecursively(File f) throws IOException {
 		if (f.isDirectory()) {
 			FileUtils.deleteDirectory(f);
-		} else {
-			f.delete();
+		} else if (!f.delete()) {
+			System.err.println("Failed to delete file " + f.getAbsolutePath());
 		}
 	}
 	
@@ -469,13 +478,13 @@ public class TestBaseUtils {
 	// Web utils
 	//---------------------------------------------------------------------------------------------
 
-	public static String getFromHTTP(String url) throws Exception{
+	public static String getFromHTTP(String url) throws Exception {
 		URL u = new URL(url);
 		LOG.info("Accessing URL "+url+" as URL: "+u);
 		HttpURLConnection connection = (HttpURLConnection) u.openConnection();
 		connection.setConnectTimeout(100000);
 		connection.connect();
-		InputStream is = null;
+		InputStream is;
 		if(connection.getResponseCode() >= 400) {
 			// error!
 			LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
@@ -486,5 +495,4 @@ public class TestBaseUtils {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 2e664c1..3ea8624 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util
 
+import org.apache.flink.runtime.StreamingMode
 import org.scalatest.{Suite, BeforeAndAfter}
 
 /** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
@@ -46,17 +47,16 @@ import org.scalatest.{Suite, BeforeAndAfter}
   *          }}}
   *
   */
-trait FlinkTestBase
-  extends BeforeAndAfter {
+trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
   var cluster: Option[ForkableFlinkMiniCluster] = None
   val parallelism = 4
 
   before {
-    val cl = TestBaseUtils.startCluster(1, parallelism, false)
+    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
     val clusterEnvironment = new TestEnvironment(cl, parallelism)
-    clusterEnvironment.setAsContext
+    clusterEnvironment.setAsContext()
 
     cluster = Some(cl)
   }