You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/26 16:23:18 UTC

[1/4] flink git commit: [FLINK-2084] [core] Add an option to start Flink in streaming mode

Repository: flink
Updated Branches:
  refs/heads/master 924830ffa -> 043772244


[FLINK-2084] [core] Add an option to start Flink in streaming mode

 - Streaming mode sets the memory manager to lazy memory allocation to ensure
   heap is not blocked by batch memory manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efec2297
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efec2297
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efec2297

Branch: refs/heads/master
Commit: efec2297b46b83e0dc6111a91a3f80f9d5375e0d
Parents: ea60678
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:12:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |   5 +-
 .../flink-bin/bin/start-cluster-streaming.sh    |  48 +++++++
 .../src/main/flink-bin/bin/start-cluster.sh     |   2 +-
 .../main/flink-bin/bin/start-local-streaming.sh |  27 ++++
 .../src/main/flink-bin/bin/start-local.bat      |   2 +-
 .../src/main/flink-bin/bin/start-local.sh       |   2 +-
 .../src/main/flink-bin/bin/taskmanager.sh       |   5 +-
 .../org/apache/flink/runtime/StreamingMode.java |  34 +++++
 .../jobmanager/JobManagerCliOptions.java        |  77 ++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../taskmanager/TaskManagerCliOptions.java      |  57 +++++++++
 .../flink/runtime/util/SignalHandler.java       |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 125 ++++++++++---------
 .../jobmanager/JobManagerCLIConfiguration.scala |  28 -----
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  31 +++--
 .../flink/runtime/taskmanager/TaskManager.scala |  69 ++++++----
 .../TaskManagerCLIConfiguration.scala           |  26 ----
 18 files changed, 402 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 273cdd3..0f1e4fa 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -20,6 +20,7 @@
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
+STREAMINGMODE=$3
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -80,7 +81,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo "Starting Job Manager"
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
         echo $! > $pid
 
     ;;
@@ -99,7 +100,7 @@ case $STARTSTOP in
     ;;
 
     (*)
-        echo "Please specify 'start (cluster|local)' or stop"
+        echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
     ;;
 
 esac

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
new file mode 100755
index 0000000..86a87cd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+HOSTLIST=$FLINK_SLAVES
+
+if [ "$HOSTLIST" = "" ]; then
+    HOSTLIST="${FLINK_CONF_DIR}/slaves"
+fi
+
+if [ ! -f "$HOSTLIST" ]; then
+    echo $HOSTLIST is not a valid slave list
+    exit 1
+fi
+
+# cluster mode, bring up job manager locally and a task manager on every slave host
+"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
+
+GOON=true
+while $GOON
+do
+    read line || GOON=false
+    if [ -n "$line" ]; then
+        HOST=$( extractHostName $line)
+        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &"
+    fi
+done < "$HOSTLIST"

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index db65032..666edeb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -43,6 +43,6 @@ do
     read line || GOON=false
     if [ -n "$line" ]; then
         HOST=$( extractHostName $line)
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start &"
+        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &"
     fi
 done < "$HOSTLIST"

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
new file mode 100755
index 0000000..2cb4d4a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# local mode, only bring up job manager. The job manager will start an internal task manager
+"$FLINK_BIN_DIR"/jobmanager.sh start local streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 386a631..202c7d9 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir "%FLINK_CONF_DIR%" > "%out%" 2>&1
+java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index f382763..7ea3ff4 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local
+"$FLINK_BIN_DIR"/jobmanager.sh start local batch

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 557ea2b..a99d39d 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -19,6 +19,7 @@
 
 
 STARTSTOP=$1
+STREAMINGMODE=$2
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -68,7 +69,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting task manager on host $HOSTNAME
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
         echo $! > $pid
 
     ;;
@@ -87,7 +88,7 @@ case $STARTSTOP in
     ;;
 
     (*)
-        echo Please specify start or stop
+        echo "Please specify 'start [batch|streaming]' or 'stop'"
     ;;
 
 esac

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
new file mode 100644
index 0000000..bdcbcf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+/**
+ * The streaming mode defines whether the system starts in streaming mode,
+ * or in pure batch mode. Note that streaming mode can execute batch programs
+ * as well.
+ */
+public enum StreamingMode {
+	
+	/** This mode indicates the system can run streaming tasks, of which batch
+	 * tasks are a special case. */
+	STREAMING,
+	
+	/** This mode indicates that the system can run only batch tasks */
+	BATCH_ONLY;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
new file mode 100644
index 0000000..988e3a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class JobManagerCliOptions {
+
+	private String configDir;
+
+	private JobManagerMode jobManagerMode;
+	
+	private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
+
+	// ------------------------------------------------------------------------
+
+	public String getConfigDir() {
+		return configDir;
+	}
+
+	public void setConfigDir(String configDir) {
+		this.configDir = configDir;
+	}
+
+	public JobManagerMode getJobManagerMode() {
+		return jobManagerMode;
+	}
+
+	public void setJobManagerMode(String modeName) {
+		if (modeName.equalsIgnoreCase("cluster")) {
+			this.jobManagerMode = JobManagerMode.CLUSTER;
+		}
+		else if (modeName.equalsIgnoreCase("local")) {
+			this.jobManagerMode = JobManagerMode.LOCAL;
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Unknown execution mode. Execution mode must be one of 'cluster' or 'local'.");
+		}
+	}
+
+	public StreamingMode getStreamingMode() {
+		return streamingMode;
+	}
+
+	public void setStreamingMode(String modeName) {
+		if (modeName.equalsIgnoreCase("streaming")) {
+			this.streamingMode = StreamingMode.STREAMING;
+		}
+		else if (modeName.equalsIgnoreCase("batch")) {
+			this.streamingMode = StreamingMode.BATCH_ONLY;
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 51ce91f..40198dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -74,7 +74,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * The Task represents one execution of a parallel subtask on a TaskManager.
  * A Task wraps a Flink operator (which may be a user function) and
- * runs it, providing all service necessary for example to consume input data,
+ * runs it, providing all services necessary for example to consume input data,
  * produce its results (intermediate result partitions) and communicate
  * with the JobManager.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
new file mode 100644
index 0000000..a648caf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class TaskManagerCliOptions {
+
+	private String configDir;
+	
+	private StreamingMode mode = StreamingMode.BATCH_ONLY;
+	
+	// ------------------------------------------------------------------------
+
+	public String getConfigDir() {
+		return configDir;
+	}
+
+	public void setConfigDir(String configDir) {
+		this.configDir = configDir;
+	}
+
+	public StreamingMode getMode() {
+		return mode;
+	}
+
+	public void setMode(String modeName) {
+		if (modeName.equalsIgnoreCase("streaming")) {
+			this.mode = StreamingMode.STREAMING;
+		}
+		else if (modeName.equalsIgnoreCase("batch")) {
+			this.mode = StreamingMode.BATCH_ONLY;
+		}
+		else {
+			throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'.");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
index 546e142..bcd3dc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
 import sun.misc.Signal;
 
 /**
- * This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger.
+ * This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger.
  */
 public class SignalHandler {
 	private static boolean registered = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0c71938..ba819ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -99,7 +99,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
                  protected val accumulatorManager: AccumulatorManager,
                  protected val defaultExecutionRetries: Int,
                  protected val delayBetweenRetries: Long,
-                 protected val timeout: FiniteDuration)
+                 protected val timeout: FiniteDuration,
+                 protected val mode: StreamingMode)
   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
@@ -759,10 +760,11 @@ object JobManager {
   val STARTUP_FAILURE_RETURN_CODE = 1
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
+  /** Name of the JobManager actor */
   val JOB_MANAGER_NAME = "jobmanager"
-  val EVENT_COLLECTOR_NAME = "eventcollector"
+
+  /** Name of the archive actor */
   val ARCHIVE_NAME = "archive"
-  val PROFILER_NAME = "profiler"
 
 
   /**
@@ -778,6 +780,7 @@ object JobManager {
     // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: JobManagerMode,
+         streamingMode: StreamingMode,
          listeningHost: String, listeningPort: Int) =
     try {
       parseArgs(args)
@@ -814,13 +817,15 @@ object JobManager {
         LOG.info("Security is enabled. Starting secure JobManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            runJobManager(configuration, executionMode, listeningHost, listeningPort)
+            runJobManager(configuration, executionMode, streamingMode,
+                          listeningHost, listeningPort)
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
-        runJobManager(configuration, executionMode, listeningHost, listeningPort)
+        runJobManager(configuration, executionMode, streamingMode,
+                      listeningHost, listeningPort)
       }
     }
     catch {
@@ -842,11 +847,13 @@ object JobManager {
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
    *                      an additional TaskManager in the same process.
+   * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
    */
   def runJobManager(configuration: Configuration,
                     executionMode: JobManagerMode,
+                    streamingMode: StreamingMode,
                     listeningAddress: String,
                     listeningPort: Int) : Unit = {
 
@@ -880,7 +887,8 @@ object JobManager {
     try {
       // bring up the job manager actor
       LOG.info("Starting JobManager actor")
-      val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
+      val (jobManager, archiver) = startJobManagerActors(configuration, 
+                                                         jobManagerSystem, streamingMode)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -898,7 +906,8 @@ object JobManager {
                         listeningAddress,
                         Some(TaskManager.TASK_MANAGER_NAME),
                         Some(jobManager.path.toString),
-                        true, classOf[TaskManager])
+                        true, streamingMode,
+                        classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
         jobManagerSystem.actorOf(
@@ -936,61 +945,59 @@ object JobManager {
    * @param args command line arguments
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
-    val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
+  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+    val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
       head("Flink JobManager")
 
-      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text {
-        "The configuration directory." }
-
-      opt[String]("executionMode") action { (arg, c) =>
-        val argLower = arg.toLowerCase()
-        var result: JobManagerCLIConfiguration = null
-
-        for (mode <- JobManagerMode.values() if result == null) {
-          val modeName = mode.name().toLowerCase()
-
-          if (modeName.equals(argLower)) {
-            result = c.copy(executionMode = mode)
-          }
-        }
+      opt[String]("configDir") action { (arg, conf) => 
+        conf.setConfigDir(arg)
+        conf
+      } text {
+        "The configuration directory."
+      }
 
-        if (result == null) {
-          throw new Exception("Unknown execution mode: " + arg)
-        } else {
-          result
-        }
+      opt[String]("executionMode") action { (arg, conf) =>
+        conf.setJobManagerMode(arg)
+        conf
       } text {
         "The execution mode of the JobManager (CLUSTER / LOCAL)"
       }
-    }
 
-    parser.parse(args, JobManagerCLIConfiguration()) map {
-      config =>
+      opt[String]("streamingMode").optional().action { (arg, conf) =>
+        conf.setStreamingMode(arg)
+        conf
+      } text {
+        "The streaming mode of the JobManager (STREAMING / BATCH)"
+      }
+    }
 
-        if (config.configDir == null) {
-          throw new Exception("Missing parameter '--configDir'")
-        }
-        if (config.executionMode == null) {
-          throw new Exception("Missing parameter '--executionMode'")
-        }
+    val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
+      throw new Exception(
+        s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
+    }
+    
+    val configDir = config.getConfigDir()
+    
+    if (configDir == null) {
+      throw new Exception("Missing parameter '--configDir'")
+    }
+    if (config.getJobManagerMode() == null) {
+      throw new Exception("Missing parameter '--executionMode'")
+    }
 
-        LOG.info("Loading configuration from " + config.configDir)
-        GlobalConfiguration.loadConfiguration(config.configDir)
-        val configuration = GlobalConfiguration.getConfiguration
+    LOG.info("Loading configuration from " + configDir)
+    GlobalConfiguration.loadConfiguration(configDir)
+    val configuration = GlobalConfiguration.getConfiguration()
 
-        if (config.configDir != null && new File(config.configDir).isDirectory) {
-          configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
-        }
+    if (new File(configDir).isDirectory) {
+      configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
+    }
 
-        val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        (configuration, config.executionMode, hostname, port)
-    } getOrElse {
-      throw new Exception("Invalid command line arguments: " + parser.usage)
-    }
+    (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
   }
 
   /**
@@ -1082,9 +1089,12 @@ object JobManager {
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(configuration: Configuration,
-                            actorSystem: ActorSystem): (ActorRef, ActorRef) = {
+                            actorSystem: ActorSystem,
+                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
 
-    startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
+    startJobManagerActors(configuration, actorSystem,
+                          Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME),
+                          streamingMode)
   }
   /**
    * Starts the JobManager and job archiver based on the given configuration, in the
@@ -1096,18 +1106,21 @@ object JobManager {
    *                          the actor will have the name generated by the actor system.
    * @param archiverActorName Optionally the name of the archive actor. If none is given,
    *                          the actor will have the name generated by the actor system.
+   * @param streamingMode The mode to run the system in (streaming vs. batch-only)
+   * 
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(configuration: Configuration,
                             actorSystem: ActorSystem,
                             jobMangerActorName: Option[String],
-                            archiverActorName: Option[String]): (ActorRef, ActorRef) = {
+                            archiverActorName: Option[String],
+                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
 
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
       executionRetries, delayBetweenRetries,
       timeout, _) = createJobManagerComponents(configuration)
 
-    // start the archiver wither with the given name, or without (avoid name conflicts)
+    // start the archiver with the given name, or without (avoid name conflicts)
     val archiver: ActorRef = archiverActorName match {
       case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
       case None => actorSystem.actorOf(archiveProps)
@@ -1115,7 +1128,7 @@ object JobManager {
 
     val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
         libraryCacheManager, archiver, accumulatorManager, executionRetries,
-        delayBetweenRetries, timeout)
+        delayBetweenRetries, timeout, streamingMode)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
deleted file mode 100644
index 4cd02c5..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-/**
- * Holder for command line parameters of the JobManager.
- *
- * @param configDir The directory to load the configuration from.
- * @param executionMode Mode for the JobManager.
- */
-case class JobManagerCLIConfiguration(configDir: String = null,
-                                      executionMode: JobManagerMode = null) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8a6c394..edc12a1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -26,6 +26,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.JobSubmissionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -44,10 +45,16 @@ import scala.concurrent.{Future, Await}
  * @param userConfiguration Configuration object with the user provided configuration values
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
+ * @param streamingMode True, if the system should be started in streaming mode, false if
+ *                      in pure batch mode.
  */
 abstract class FlinkMiniCluster(val userConfiguration: Configuration,
-                                val singleActorSystem: Boolean) {
+                                val singleActorSystem: Boolean,
+                                val streamingMode: StreamingMode) {
 
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
+         = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  
   protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e2d7cc1..663307d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.runtime.minicluster
 
-import akka.actor.{ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
+
 import org.slf4j.LoggerFactory
-import akka.actor.ExtendedActorSystem
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -41,9 +42,20 @@ import akka.actor.ExtendedActorSystem
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
  */
-class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
-
+class LocalFlinkMiniCluster(userConfiguration: Configuration,
+                            singleActorSystem: Boolean,
+                            streamingMode: StreamingMode)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
+  
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+  // --------------------------------------------------------------------------
+  
+  
   val jobClientActorSystem = if (singleActorSystem) {
     jobManagerActorSystem
   } else {
@@ -64,7 +76,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
 
   override def startJobManager(system: ActorSystem): ActorRef = {
     val config = configuration.clone()
-    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system)
+       
+    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
+    
     if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
       val webServer = new WebInfoServer(configuration, jobManager, archiver)
       webServer.start()
@@ -103,12 +117,13 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     } else {
       None
     }
-
+    
     TaskManager.startTaskManagerComponentsAndActor(config, system,
                                                    HOSTNAME, // network interface to bind to
                                                    Some(taskManagerActorName), // actor name
                                                    jobManagerPath, // job manager akka URL
                                                    localExecution, // start network stack?
+                                                   streamingMode,
                                                    classOf[TaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a95b5cb..8a45fa4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,7 +37,7 @@ import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -999,8 +999,8 @@ object TaskManager {
   /** Return code for critical errors during the runtime */
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
+  /** The name of the TaskManager actor */
   val TASK_MANAGER_NAME = "taskmanager"
-  val PROFILER_NAME = "profiler"
 
   /** Maximum time (msecs) that the TaskManager will spend searching for a
     * suitable network interface to use for communication */
@@ -1033,7 +1033,8 @@ object TaskManager {
     EnvironmentInformation.checkJavaVersion()
 
     // try to parse the command line arguments
-    val configuration = try {
+    val (configuration: Configuration,
+         mode: StreamingMode) = try {
       parseArgsAndLoadConfig(args)
     }
     catch {
@@ -1050,13 +1051,13 @@ object TaskManager {
         LOG.info("Security is enabled. Starting secure TaskManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+            selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+        selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
       }
     }
     catch {
@@ -1074,31 +1075,44 @@ object TaskManager {
    * @return The parsed configuration.
    */
   @throws(classOf[Exception])
-  def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
-
+  def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = {
+    
     // set up the command line parser
-    val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
-      head("flink task manager")
-      opt[String]("configDir") action { (x, c) =>
-        c.copy(configDir = x)
-      } text "Specify configuration directory."
+    val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") {
+      head("Flink TaskManager")
+      
+      opt[String]("configDir") action { (param, conf) =>
+        conf.setConfigDir(param)
+        conf
+      } text {
+        "Specify configuration directory."
+      }
+
+      opt[String]("streamingMode").optional().action { (param, conf) =>
+        conf.setMode(param)
+        conf
+      } text {
+        "The streaming mode of the JobManager (STREAMING / BATCH)"
+      }
     }
 
     // parse the CLI arguments
-    val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse {
+    val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse {
       throw new Exception(
         s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
 
     // load the configuration
-    try {
-      LOG.info("Loading configuration from " + cliConfig.configDir)
-      GlobalConfiguration.loadConfiguration(cliConfig.configDir)
+    val conf: Configuration = try {
+      LOG.info("Loading configuration from " + cliConfig.getConfigDir())
+      GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
       GlobalConfiguration.getConfiguration()
     }
     catch {
       case e: Exception => throw new Exception("Could not load configuration", e)
     }
+    
+    (conf, cliConfig.getMode)
   }
 
   // --------------------------------------------------------------------------
@@ -1120,11 +1134,13 @@ object TaskManager {
    * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
 
    * @param configuration The configuration for the TaskManager.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate.
    *                         Allows to use TaskManager subclasses for example for YARN.
    */
   @throws(classOf[Exception])
   def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
+                                              streamingMode: StreamingMode,
                                               taskManagerClass: Class[_ <: TaskManager]) : Unit = {
 
     val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
@@ -1132,7 +1148,8 @@ object TaskManager {
     val (taskManagerHostname, actorSystemPort) =
        selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+                   streamingMode, taskManagerClass)
   }
 
   @throws(classOf[IOException])
@@ -1196,14 +1213,17 @@ object TaskManager {
    * @param taskManagerHostname The hostname/address of the interface where the actor system
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
   def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int,
-                     configuration: Configuration) : Unit = {
+                     actorSystemPort: Int, 
+                     configuration: Configuration,
+                     streamingMode: StreamingMode) : Unit = {
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+                   streamingMode, classOf[TaskManager])
   }
 
   /**
@@ -1218,6 +1238,7 @@ object TaskManager {
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
    *                         subclasses for example for YARN.
    */
@@ -1225,6 +1246,7 @@ object TaskManager {
   def runTaskManager(taskManagerHostname: String,
                      actorSystemPort: Int,
                      configuration: Configuration,
+                     streamingMode: StreamingMode,
                      taskManagerClass: Class[_ <: TaskManager]) : Unit = {
 
     LOG.info("Starting TaskManager")
@@ -1264,6 +1286,7 @@ object TaskManager {
                                                            taskManagerHostname,
                                                            Some(TASK_MANAGER_NAME),
                                                            None, false,
+                                                           streamingMode,
                                                            taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
@@ -1317,6 +1340,7 @@ object TaskManager {
    *                       JobManager hostname an port specified in the configuration.
    * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
    *                                      TCP network stack.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The class of the TaskManager actor. May be used to give
    *                         subclasses that understand additional actor messages.
    *
@@ -1339,6 +1363,7 @@ object TaskManager {
                                          taskManagerActorName: Option[String],
                                          jobManagerPath: Option[String],
                                          localTaskManagerCommunication: Boolean,
+                                         streamingMode: StreamingMode,
                                          taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
 
     // get and check the JobManager config
@@ -1391,13 +1416,15 @@ object TaskManager {
 
       relativeMemSize
     }
+    
+    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
 
     // now start the memory manager
     val memoryManager = try {
       new DefaultMemoryManager(memorySize,
                                taskManagerConfig.numberOfSlots,
                                netConfig.networkBufferSize,
-                               true)
+                               preAllocateMemory)
     }
     catch {
       case e: OutOfMemoryError => throw new Exception(

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
deleted file mode 100644
index 5c71f5e..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-/**
- * Command line configuration object for the [[TaskManager]]
- *
- * @param configDir Path to configuration directory
- */
-case class TaskManagerCLIConfiguration(configDir: String = null)


[3/4] flink git commit: [tests] Adjust tests for dedicated streaming mode and clean up test bases.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 05093b5..762d77e 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -21,6 +21,7 @@ package org.apache.flink.test.util
 import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.pattern.Patterns._
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
@@ -39,11 +40,19 @@ import scala.concurrent.Await
  * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
  *                          same [[ActorSystem]], otherwise false.
  */
-class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
-  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+class ForkableFlinkMiniCluster(userConfiguration: Configuration,
+                               singleActorSystem: Boolean,
+                               streamingMode: StreamingMode)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+  
 
-  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
+       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
 
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+  
+  // --------------------------------------------------------------------------
+  
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val forNumberString = System.getProperty("forkNumber")
 
@@ -77,10 +86,10 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
 
     val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+    
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
       libraryCacheManager, archive, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout) with TestingJobManager)
+      delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
 
@@ -118,7 +127,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
 
     TaskManager.startTaskManagerComponentsAndActor(config, system, HOSTNAME,
         Some(TaskManager.TASK_MANAGER_NAME + index), jobManagerAkkaUrl, localExecution,
-         classOf[TestingTaskManager])
+      streamingMode, classOf[TestingTaskManager])
   }
 
   def restartJobManager(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index d55aff4..e001fc8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -118,7 +119,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
 			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.STREAMING)._1();
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -369,7 +370,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
+				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
 
 				// wait forever
 				Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 5482fc5..706d197 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -99,7 +100,7 @@ public class ProcessFailureCancelingITCase {
 			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.BATCH_ONLY)._1();
 
 			// the TaskManager java command
 			String[] command = new String[] {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index ab70bcc..31f8560 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -37,10 +37,11 @@ import java.util.Collection;
 @RunWith(Parameterized.class)
 public class WebFrontendITCase extends MultipleProgramsTestBase {
 
-	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, 4, true);
+	// make sure that the webserver is started for us!
+	static {
+		startWebServer = true;
 	}
+	
 
 	public WebFrontendITCase(TestExecutionMode m) {
 		super(m);

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index d19f543..5ad5351 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.scala.actions
 
 import org.apache.flink.api.scala._
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index e9b4ffe..8f1e1f8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.functions
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.Assert.fail
 import org.junit.{After, Before, Test, Rule}
 import org.junit.rules.TemporaryFolder
@@ -49,7 +49,7 @@ class ClosureCleanerITCase(mode: TestExecutionMode) extends MultipleProgramsTest
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(result, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(result, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index c8d6639..b09ecc4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.hadoop.mapred
 import org.apache.flink.api.scala._
 
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{Text, LongWritable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
@@ -35,7 +35,8 @@ class WordCountMapredITCase extends JavaProgramTestBase {
   }
 
   protected override def postSubmit() {
-    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+                                                resultPath, Array[String](".", "_"))
   }
 
   protected def testProgram() {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index 8988baf..de2d376 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.hadoop.mapreduce
 
 import org.apache.flink.api.scala._
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{Text, LongWritable}
 import org.apache.hadoop.mapreduce.Job
@@ -37,7 +37,8 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
   }
 
   protected override def postSubmit() {
-    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_"))
+    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
+                                                resultPath, Array[String](".", "_"))
   }
 
   protected def testProgram() {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 7e395d9..eb5ee58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -22,8 +22,8 @@ import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit.Assert._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class ScalaCsvReaderWithPOJOITCase(mode: TestExecutionMode) extends MultipleProg
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   def createInputData(data: String): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 432a6d4..79f781f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index eaf4117..448479e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -50,7 +50,7 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index 512ec6c..82fd1a2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class CrossITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 8c10271..cf82ce9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 7cf802c..3e98adf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -17,20 +17,17 @@
  */
 package org.apache.flink.api.scala.operators
 
+import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.apache.flink.api.scala._
-import org.junit.runners.Parameterized.Parameters
-import scala.collection.JavaConverters._
 
-import scala.collection.mutable
 
 // TODO case class Tuple2[T1, T2](_1: T1, _2: T2)
 // TODO case class Foo(a: Int, b: String
@@ -77,7 +74,7 @@ class ExamplesITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 082201b..54ae0e7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -17,12 +17,14 @@
  */
 package org.apache.flink.api.scala.operators
 
+import org.apache.flink.api.scala._
 import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -30,8 +32,6 @@ import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
 
-import org.apache.flink.api.scala._
-
 @RunWith(classOf[Parameterized])
 class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   private var resultPath: String = null
@@ -48,7 +48,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 36cc3a7..4d6405d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,7 +45,7 @@ class FirstNITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 61751da..21d7a0b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
@@ -50,7 +50,7 @@ class FlatMapITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 1b40205..576ecdf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector
 import org.junit._
@@ -67,9 +67,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
-    env.execute
+    env.execute()
   }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index b832647..09f0c51 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,8 +28,8 @@ CustomType}
 import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.hamcrest.core.{IsNot, IsEqual}
 import org.junit._
@@ -58,7 +58,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   @After
   def after(): Unit = {
     if(expected != null) {
-      compareResultsByLinesInMemory(expected, resultPath)
+      TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index fc4a9ce..a958250 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.RichJoinFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -48,7 +48,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 64fdc1f..d4555b7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class MapITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 08d82d2..7d26643 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -46,7 +46,7 @@ class PartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index a9f420f..69cb6f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -49,7 +49,7 @@ class ReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index 10bb7c5..d94f099 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index b0b3764..cab6ab7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -51,7 +51,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
   @After
   def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 48a33f7..2e554fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder
 
@@ -57,7 +57,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -78,7 +78,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -99,7 +99,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -122,7 +122,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -143,7 +143,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -164,7 +164,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -187,7 +187,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -208,7 +208,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("60", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("60", resultPath)
   }
 
   @Test
@@ -229,7 +229,7 @@ class ScalaSpecialTypesITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     env.execute()
 
-    compareResultsByLinesInMemory("80", resultPath)
+    TestBaseUtils.compareResultsByLinesInMemory("80", resultPath)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 3556ec1..564a0bd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -25,16 +25,19 @@ import java.util.Map;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.yarn.YarnTaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.yarn.FlinkYarnClient;
+
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * The entry point for running a TaskManager in a YARN container. The YARN container will invoke
  * this class' main method.
@@ -51,8 +54,11 @@ public class YarnTaskManagerRunner {
 
 		// try to parse the command line arguments
 		final Configuration configuration;
+		final StreamingMode mode;
 		try {
-			configuration = TaskManager.parseArgsAndLoadConfig(args);
+			scala.Tuple2<Configuration, StreamingMode> res = TaskManager.parseArgsAndLoadConfig(args);
+			configuration = res._1();
+			mode = res._2();
 		}
 		catch (Throwable t) {
 			LOG.error(t.getMessage(), t);
@@ -87,7 +93,8 @@ public class YarnTaskManagerRunner {
 			@Override
 			public Object run() {
 				try {
-					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, YarnTaskManager.class);
+					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration,
+																		mode, YarnTaskManager.class);
 				}
 				catch (Throwable t) {
 					LOG.error("Error while starting the TaskManager", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 7884edd..c1a937e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -24,6 +24,7 @@ import akka.actor._
 import grizzled.slf4j.Logger
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
@@ -54,6 +55,8 @@ object ApplicationMaster {
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
     EnvironmentInformation.checkJavaVersion()
     org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
+    
+    val streamingMode = StreamingMode.BATCH_ONLY
 
     val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
 
@@ -91,9 +94,12 @@ object ApplicationMaster {
           val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
           val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
 
-          val (config, system, jobManager, archiver) = startJobManager(currDir, ownHostname,
-                                                      dynamicPropertiesEncodedString)
-
+          val (config: Configuration,
+               system: ActorSystem,
+               jobManager: ActorRef,
+               archiver: ActorRef) = startJobManager(currDir, ownHostname,
+                                                     dynamicPropertiesEncodedString,
+                                                     streamingMode)
           actorSystem = system
           val extActor = system.asInstanceOf[ExtendedActorSystem]
           val jobManagerPort = extActor.provider.getDefaultAddress.port.get
@@ -195,15 +201,12 @@ object ApplicationMaster {
   /**
    * Starts the JobManager and all its components.
    *
-   * @param currDir
-   * @param hostname
-   * @param dynamicPropertiesEncodedString
-   *
    * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef)
    */
   def startJobManager(currDir: String,
                       hostname: String,
-                      dynamicPropertiesEncodedString: String):
+                      dynamicPropertiesEncodedString: String,
+                      streamingMode: StreamingMode):
     (Configuration, ActorSystem, ActorRef, ActorRef) = {
 
     LOG.info("Starting JobManager for YARN")
@@ -236,7 +239,7 @@ object ApplicationMaster {
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
       libraryCacheManager, archiver, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout) with ApplicationMasterActor)
+      delayBetweenRetries, timeout, streamingMode) with ApplicationMasterActor)
 
     LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)


[4/4] flink git commit: [tests] Adjust tests for dedicated streaming mode and clean up test bases.

Posted by se...@apache.org.
[tests] Adjust tests for dedicated streaming mode and clean up test bases.

This closes #718


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04377224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04377224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04377224

Branch: refs/heads/master
Commit: 0437722449c59fdc21fc5d18a59e5e0b961208af
Parents: efec229
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:24:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:23 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../JobManagerProcessReapingTest.java           |  3 +-
 .../jobmanager/JobManagerStartupTest.java       | 11 ++-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  3 +-
 ...askManagerComponentsStartupShutdownTest.java |  4 +-
 .../TaskManagerProcessReapingTest.java          |  5 +-
 .../TaskManagerRegistrationTest.java            |  9 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 12 ++-
 .../taskmanager/TestManagerStartupTest.java     | 10 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  6 +-
 .../runtime/testingUtils/TestingCluster.scala   | 24 +++--
 .../runtime/testingUtils/TestingUtils.scala     | 28 +-----
 .../api/complex/ComplexIntegrationTest.java     |  9 +-
 .../util/StreamingMultipleProgramsTestBase.java | 53 ++++++-----
 .../util/StreamingProgramTestBase.java          |  2 +-
 .../scala/table/test/AggregationsITCase.scala   | 20 ++---
 .../flink/api/scala/table/test/AsITCase.scala   | 21 ++---
 .../api/scala/table/test/CastingITCase.scala    | 14 +--
 .../scala/table/test/ExpressionsITCase.scala    | 22 ++---
 .../api/scala/table/test/FilterITCase.scala     | 20 ++---
 .../table/test/GroupedAggreagationsITCase.scala | 14 +--
 .../flink/api/scala/table/test/JoinITCase.scala | 22 ++---
 .../api/scala/table/test/SelectITCase.scala     | 22 ++---
 .../table/test/StringExpressionsITCase.scala    | 16 ++--
 .../util/AbstractMultipleProgramsTestBase.java  | 84 ------------------
 .../flink/test/util/AbstractTestBase.java       | 14 +--
 .../test/util/MultipleProgramsTestBase.java     | 52 ++++++++++-
 .../apache/flink/test/util/TestBaseUtils.java   | 92 +++++++++++---------
 .../apache/flink/test/util/FlinkTestBase.scala  |  8 +-
 .../test/util/ForkableFlinkMiniCluster.scala    | 21 +++--
 .../AbstractProcessFailureRecoveryTest.java     |  5 +-
 .../recovery/ProcessFailureCancelingITCase.java |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  9 +-
 .../api/scala/actions/CountCollectITCase.scala  |  2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |  6 +-
 .../hadoop/mapred/WordCountMapredITCase.scala   |  5 +-
 .../mapreduce/WordCountMapreduceITCase.scala    |  5 +-
 .../scala/io/ScalaCsvReaderWithPOJOITCase.scala |  6 +-
 .../api/scala/operators/AggregateITCase.scala   |  6 +-
 .../api/scala/operators/CoGroupITCase.scala     |  6 +-
 .../flink/api/scala/operators/CrossITCase.scala |  6 +-
 .../api/scala/operators/DistinctITCase.scala    |  6 +-
 .../api/scala/operators/ExamplesITCase.scala    | 13 ++-
 .../api/scala/operators/FilterITCase.scala      | 10 +--
 .../api/scala/operators/FirstNITCase.scala      |  6 +-
 .../api/scala/operators/FlatMapITCase.scala     |  6 +-
 .../scala/operators/GroupCombineITCase.scala    |  5 +-
 .../api/scala/operators/GroupReduceITCase.scala |  6 +-
 .../flink/api/scala/operators/JoinITCase.scala  |  6 +-
 .../flink/api/scala/operators/MapITCase.scala   |  6 +-
 .../api/scala/operators/PartitionITCase.scala   |  6 +-
 .../api/scala/operators/ReduceITCase.scala      |  6 +-
 .../api/scala/operators/SumMinMaxITCase.scala   |  2 +-
 .../flink/api/scala/operators/UnionITCase.scala |  6 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala | 22 ++---
 .../yarn/appMaster/YarnTaskManagerRunner.java   | 15 +++-
 .../apache/flink/yarn/ApplicationMaster.scala   | 21 +++--
 57 files changed, 421 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ba819ca..d5df633 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -945,7 +945,8 @@ object JobManager {
    * @param args command line arguments
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+  def parseArgs(args: Array[String]):
+                     (Configuration, JobManagerMode, StreamingMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
       head("Flink JobManager")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index a332ee1..be73bf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,6 +27,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -183,7 +184,7 @@ public class JobManagerProcessReapingTest {
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
-				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port);
+				JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.BATCH_ONLY, "localhost", port);
 				System.exit(0);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 070e376..e7665a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.net.NetUtils;
 
 import org.junit.After;
@@ -80,7 +81,8 @@ public class JobManagerStartupTest {
 		}
 		
 		try {
-			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
+			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER,
+									StreamingMode.BATCH_ONLY, "localhost", portNum);
 			fail("this should throw an exception");
 		}
 		catch (Exception e) {
@@ -94,7 +96,9 @@ public class JobManagerStartupTest {
 			try {
 				portOccupier.close();
 			}
-			catch (Throwable t) {}
+			catch (Throwable t) {
+				// ignore
+			}
 		}
 	}
 
@@ -117,7 +121,8 @@ public class JobManagerStartupTest {
 		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
 
 		try {
-			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
+			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER,
+										StreamingMode.BATCH_ONLY, "localhost", portNum);
 			fail("this should fail with an exception");
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index e69687f..7b6d688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -61,7 +62,7 @@ public class JobSubmitTest {
 
 		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
 		jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
-		jobManager = JobManager.startJobManagerActors(config, jobManagerSystem)._1();
+		jobManager = JobManager.startJobManagerActors(config, jobManagerSystem, StreamingMode.BATCH_ONLY)._1();
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7d4994d..a67cd00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -68,7 +69,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 		try {
 			actorSystem = AkkaUtils.createLocalActorSystem(config);
 
-			final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+			final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem, 
+																			StreamingMode.BATCH_ONLY)._1();
 
 			// create the components for the TaskManager manually
 			final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 91fadca..c55a721 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -84,7 +85,7 @@ public class TaskManagerProcessReapingTest {
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
-			JobManager.startJobManagerActors(new Configuration(), jmActorSystem);
+			JobManager.startJobManagerActors(new Configuration(), jmActorSystem, StreamingMode.BATCH_ONLY);
 
 			final int taskManagerPort = NetUtils.getAvailablePort();
 
@@ -198,7 +199,7 @@ public class TaskManagerProcessReapingTest {
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
 
-				TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
+				TaskManager.runTaskManager("localhost", taskManagerPort, cfg, StreamingMode.BATCH_ONLY);
 
 				// wait forever
 				Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 69964ea..3e5f2cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -152,7 +153,8 @@ public class TaskManagerRegistrationTest {
 				Thread.sleep(6000);
 
 				// now start the JobManager, with the regular akka URL
-				final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+				final ActorRef jobManager =
+						JobManager.startJobManagerActors(new Configuration(), actorSystem, StreamingMode.BATCH_ONLY)._1();
 
 				// check that the TaskManagers are registered
 				Future<Object> responseFuture = Patterns.ask(
@@ -371,6 +373,7 @@ public class TaskManagerRegistrationTest {
 							NONE_STRING, // no actor name -> random
 							new Some<String>(jobManager.path().toString()), // job manager path
 							false, // init network stack !!!
+							StreamingMode.BATCH_ONLY,
 							TaskManager.class);
 
 					watch(taskManager);
@@ -415,7 +418,8 @@ public class TaskManagerRegistrationTest {
 	private static ActorRef startJobManager() throws Exception {
 		// start the actors. don't give names, so they get generated names and we
 		// avoid conflicts with the actor names
-		return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+		return JobManager.startJobManagerActors(new Configuration(), actorSystem, 
+												NONE_STRING, NONE_STRING, StreamingMode.BATCH_ONLY)._1();
 	}
 
 	private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
@@ -430,6 +434,7 @@ public class TaskManagerRegistrationTest {
 				NONE_STRING, // no actor name -> random
 				new Some<String>(jobManagerUrl), // job manager path
 				true, // local network stack only
+				StreamingMode.BATCH_ONLY,
 				TaskManager.class);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a308c81..d33dcd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -27,9 +27,11 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -59,11 +61,14 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -903,7 +908,8 @@ public class TaskManagerTest {
 		return createTaskManager(jobManager, waitForRegistration, true, NetUtils.getAvailablePort());
 	}
 
-	public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, boolean useLocalCommunication, int dataPort) {
+	public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, 
+												boolean useLocalCommunication, int dataPort) {
 		ActorRef taskManager = null;
 		try {
 			Configuration cfg = new Configuration();
@@ -916,7 +922,9 @@ public class TaskManagerTest {
 					cfg, system, "localhost",
 					Option.<String>empty(),
 					jobMangerUrl,
-					useLocalCommunication, TestingTaskManager.class);
+					useLocalCommunication,
+					StreamingMode.BATCH_ONLY,
+					TestingTaskManager.class);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
index 2b945e1..a033eb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.StreamingMode;
 import org.junit.Test;
 
 import java.io.File;
@@ -53,7 +54,8 @@ public class TestManagerStartupTest {
 			final int port = blocker.getLocalPort();
 
 			try {
-				TaskManager.runTaskManager(localHostName, port, new Configuration(), TaskManager.class);
+				TaskManager.runTaskManager(localHostName, port, new Configuration(),
+											StreamingMode.BATCH_ONLY, TaskManager.class);
 				fail("This should fail with an IOException");
 			}
 			catch (IOException e) {
@@ -101,7 +103,7 @@ public class TestManagerStartupTest {
 			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IOException e) {
@@ -138,7 +140,7 @@ public class TestManagerStartupTest {
 			// something invalid
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -150,7 +152,7 @@ public class TestManagerStartupTest {
 									ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
 			cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
 			try {
-				TaskManager.runTaskManager("localhost", 0, cfg);
+				TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 5fde5ea..5ae6b5b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -23,6 +23,7 @@ import java.net.InetAddress
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
@@ -122,8 +123,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   private def startTestingJobManager(system: ActorSystem): ActorRef = {
-    val (jm: ActorRef, _) = JobManager.startJobManagerActors(
-                                        new Configuration(), _system, None, None)
+    val (jm: ActorRef, _) = JobManager.startJobManagerActors(new Configuration(), _system,
+                                                             None, None,
+                                                             StreamingMode.BATCH_ONLY)
     jm
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9f9fe93..a904f60 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, Props, ActorSystem}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
@@ -33,9 +34,19 @@ import org.apache.flink.runtime.taskmanager.TaskManager
  * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
  *                          otherwise false
  */
-class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
+class TestingCluster(userConfiguration: Configuration,
+                     singleActorSystem: Boolean,
+                     streamingMode: StreamingMode)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+  
 
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
+        = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+  
+  // --------------------------------------------------------------------------
+  
   override def generateConfiguration(userConfig: Configuration): Configuration = {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -52,13 +63,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
     val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
     executionRetries, delayBetweenRetries,
     timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
-
+    
     val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+    
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
       libraryCacheManager, archive, accumulatorManager, executionRetries,
-      delayBetweenRetries, timeout) with TestingJobManager)
+      delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
 
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
@@ -72,12 +83,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
     } else {
       None
     }
-
+    
     TaskManager.startTaskManagerComponentsAndActor(configuration, system,
                                                    HOSTNAME,
                                                    Some(tmActorName),
                                                    jobManagerPath,
                                                    numTaskManagers == 1,
+                                                   streamingMode,
                                                    classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 63dce31..3611633 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
+
 import com.typesafe.config.ConfigFactory
+
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.taskmanager.TaskManager
-import scala.concurrent.duration._
 
+import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext
 import scala.language.postfixOps
 
@@ -55,28 +56,7 @@ object TestingUtils {
   }
 
   def getDefaultTestingActorSystemConfig = testConfig
-
-
-  def startTestingTaskManagerWithConfiguration(hostname: String,
-                                               jobManagerURL: String,
-                                               config: Configuration,
-                                               system: ActorSystem) : ActorRef = {
-
-
-    TaskManager.startTaskManagerComponentsAndActor(config, system,
-                                                   hostname,
-                                                   None, // random actor name
-                                                   Some(jobManagerURL), // job manager
-                                                   true, classOf[TestingTaskManager])
-  }
-
-  def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
-
-    val jmURL = jobManager.path.toString
-    val config = new Configuration()
-
-    startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
-  }
+  
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 738654a..6fdd839 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -52,8 +52,6 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
@@ -65,7 +63,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
 public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 	// *************************************************************************
@@ -76,10 +74,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 	private String resultPath2;
 	private String expected1;
 	private String expected2;
-
-	public ComplexIntegrationTest(TestExecutionMode mode) {
-		super(mode);
-	}
+	
 
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 36e62f9..945ac07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 /**
  * Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -53,25 +53,34 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 
-	public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
-		super(mode);
-		switch(this.mode){
-			case CLUSTER:
-				TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
-				clusterEnv.setAsContext();
-				break;
-			case COLLECTION:
-				throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
-		}
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+	
+	// ------------------------------------------------------------------------
+	
+	public StreamingMultipleProgramsTestBase() {
+		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+		clusterEnv.setAsContext();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Cluster setup & teardown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
 	}
 
-	@Parameterized.Parameters(name = "Execution mode = {0}")
-	public static Collection<TestExecutionMode[]> executionModes() {
-		TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
-		ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
-		temsList.add(tems);
-		return temsList;
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 0446b61..23be327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 public abstract class StreamingProgramTestBase extends AbstractTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 38be85e..3b7ab8d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAggregationTypes: Unit = {
+  def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -62,7 +62,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAggregationOnNonExistingField: Unit = {
+  def testAggregationOnNonExistingField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -74,7 +74,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test
-  def testWorkingAggregationDataTypes: Unit = {
+  def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(
@@ -88,7 +88,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test
-  def testAggregationWithArithmetic: Unit = {
+  def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
@@ -100,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingAggregationDataTypes: Unit = {
+  def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable
@@ -112,7 +112,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNoNestedAggregations: Unit = {
+  def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("Hello", 1)).toTable

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 3a0cc69..c6259ec 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -32,6 +32,7 @@ import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
 class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  
   private var resultPath: String = null
   private var expected: String = ""
   private val _tempFolder = new TemporaryFolder()
@@ -45,12 +46,12 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAs: Unit = {
+  def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -66,7 +67,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
+  def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -77,7 +78,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
+  def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -88,7 +89,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
+  def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -99,7 +100,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference1: Unit = {
+  def testAsWithNonFieldReference1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references
@@ -111,7 +112,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference2: Unit = {
+  def testAsWithNonFieldReference2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // as can only have field references

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 9557985..736cf68 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.scala.table.test
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -43,12 +43,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAutoCastToString: Unit = {
+  def testAutoCastToString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
@@ -60,7 +60,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testNumericAutoCastInArithmetic: Unit = {
+  def testNumericAutoCastInArithmetic(): Unit = {
 
     // don't test everything, just some common cast directions
 
@@ -74,7 +74,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testNumericAutoCastInComparison: Unit = {
+  def testNumericAutoCastInComparison(): Unit = {
 
     // don't test everything, just some common cast directions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8c60acf..d9de287 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testArithmetic: Unit = {
+  def testArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 10)).as('a, 'b)
@@ -61,7 +61,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testLogic: Unit = {
+  def testLogic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, true)).as('a, 'b)
@@ -73,7 +73,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testComparisons: Unit = {
+  def testComparisons(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
@@ -85,7 +85,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testBitwiseOperations: Unit = {
+  def testBitwiseOperations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -98,7 +98,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testBitwiseWithAutocast: Unit = {
+  def testBitwiseWithAutocast(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -111,7 +111,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testBitwiseWithNonWorkingAutocast: Unit = {
+  def testBitwiseWithNonWorkingAutocast(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -124,7 +124,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
   }
 
   @Test
-  def testCaseInsensitiveForAs: Unit = {
+  def testCaseInsensitiveForAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 982a302..bc51a7e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -47,12 +47,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAllRejectingFilter: Unit = {
+  def testAllRejectingFilter(): Unit = {
     /*
      * Test all-rejecting filter.
      */
@@ -67,7 +67,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testAllPassingFilter: Unit = {
+  def testAllPassingFilter(): Unit = {
     /*
      * Test all-passing filter.
      */
@@ -87,7 +87,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testFilterOnStringTupleField: Unit = {
+  def testFilterOnStringTupleField(): Unit = {
     /*
      * Test filter on String tuple field.
      */
@@ -100,7 +100,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testFilterOnIntegerTupleField: Unit = {
+  def testFilterOnIntegerTupleField(): Unit = {
     /*
      * Test filter on Integer tuple field.
      */
@@ -120,7 +120,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @Ignore
   @Test
-  def testFilterBasicType: Unit = {
+  def testFilterBasicType(): Unit = {
     /*
      * Test filter on basic type
      */
@@ -137,7 +137,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
   @Ignore
   @Test
-  def testFilterOnCustomType: Unit = {
+  def testFilterOnCustomType(): Unit = {
     /*
      * Test filter on custom type
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 1f29722..d76d75c 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testGroupingOnNonExistentField: Unit = {
+  def testGroupingOnNonExistentField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -63,7 +63,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @Test
-  def testGroupedAggregate: Unit = {
+  def testGroupedAggregate(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
     // if we don't want the key in the output
@@ -79,7 +79,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
   }
 
   @Test
-  def testGroupingKeyForwardIfNotUsed: Unit = {
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
     // if we don't want the key in the output

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 17221d8..b3baa56 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testJoin: Unit = {
+  def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -63,7 +63,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithFilter: Unit = {
+  def testJoinWithFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -76,7 +76,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithMultipleKeys: Unit = {
+  def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -90,7 +90,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinNonExistingKey: Unit = {
+  def testJoinNonExistingKey(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -103,7 +103,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinWithNonMatchingKeyTypes: Unit = {
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -116,7 +116,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testJoinWithAmbiguousFields: Unit = {
+  def testJoinWithAmbiguousFields(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
@@ -129,7 +129,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testJoinWithAggregation: Unit = {
+  def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 6ba6c9f..1a13d93 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testSimpleSelectAll: Unit = {
+  def testSimpleSelectAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
@@ -66,7 +66,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testSimpleSelectAllWithAs: Unit = {
+  def testSimpleSelectAllWithAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
@@ -82,7 +82,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test
-  def testSimpleSelectWithNaming: Unit = {
+  def testSimpleSelectWithNaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -97,7 +97,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
+  def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -108,7 +108,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
+  def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -119,7 +119,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
+  def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -131,7 +131,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
 
   @Test(expected = classOf[ExpressionException])
-  def testOnlyFieldRefInAs: Unit = {
+  def testOnlyFieldRefInAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 3f0f46f..65fe77a 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testSubstring: Unit = {
+  def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
       .select('a.substring(0, 'b))
@@ -60,7 +60,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test
-  def testSubstringWithMaxEnd: Unit = {
+  def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
       .select('a.substring('b))
@@ -71,7 +71,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring1: Unit = {
+  def testNonWorkingSubstring1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
@@ -83,7 +83,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
   }
 
   @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring2: Unit = {
+  def testNonWorkingSubstring2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
deleted file mode 100644
index ef81dfe..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Abstract base class for unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the ExecutionEnvironment from
- * the context:
- *
- * <pre>{@code
- *
- *   @Test
- *   public void someTest() {
- *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   @Test
- *   public void anotherTest() {
- *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * }</pre>
- */
-public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
-
-	/**
-	 * Enum that defines which execution environment to run the next test on:
-	 * An embedded local flink cluster, or the collection execution backend.
-	 */
-	public enum TestExecutionMode {
-		CLUSTER,
-		COLLECTION
-	}
-
-	// -----------------------------------------------------------------------------------------...
-
-	private static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster = null;
-
-	protected transient TestExecutionMode mode;
-
-	public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
-		this.mode = mode;
-	}
-
-	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c8b79ef..a5a7da9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -36,15 +37,16 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 
 	protected final Configuration config;
 
-	protected ForkableFlinkMiniCluster executor;
-
 	private final List<File> tempFiles;
 
+	private final FiniteDuration timeout;
+	
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
 	protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
-
-	private final FiniteDuration timeout;
+	
+	protected ForkableFlinkMiniCluster executor;
+	
 
 	public AbstractTestBase(Configuration config) {
 		this.config = config;
@@ -57,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
 
-	public void startCluster() throws Exception{
-		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, false);
+	public void startCluster() throws Exception {
+		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
 	}
 
 	public void stopCluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index e0c4360..8dab485 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.StreamingMode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -51,11 +54,36 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class MultipleProgramsTestBase extends TestBaseUtils {
 
+	/**
+	 * Enum that defines which execution environment to run the next test on:
+	 * An embedded local flink cluster, or the collection execution backend.
+	 */
+	public enum TestExecutionMode {
+		CLUSTER,
+		COLLECTION
+	}
+	
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	protected static boolean startWebServer = false;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+	
+	// ------------------------------------------------------------------------
+	
+	protected final TestExecutionMode mode;
+
+	
 	public MultipleProgramsTestBase(TestExecutionMode mode){
-		super(mode);
-		switch(this.mode){
+		this.mode = mode;
+		
+		switch(mode){
 			case CLUSTER:
 				TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
 				clusterEnv.setAsContext();
@@ -67,6 +95,24 @@ public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Cluster setup & teardown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Parametrization lets the tests run in cluster and collection mode
+	// ------------------------------------------------------------------------
+	
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes(){
 		return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b0e4ae9..29cb2a4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
@@ -79,46 +80,55 @@ public class TestBaseUtils {
 
 	protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
 
-	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration
-			(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 
+	// ------------------------------------------------------------------------
+	
 	protected static File logDir;
 
 	protected TestBaseUtils(){
 		verifyJvmOptions();
 	}
-
-	private void verifyJvmOptions() {
+	
+	private static void verifyJvmOptions() {
 		long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
-
-	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
-			taskManagerNumSlots, boolean startWebserver) throws Exception {
+	
+	
+	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+															int taskManagerNumSlots,
+															StreamingMode mode,
+															boolean startWebserver) throws Exception {
+		
 		logDir = File.createTempFile("TestBaseUtils-logdir", null);
 		Assert.assertTrue("Unable to delete temp file", logDir.delete());
 		Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
+	
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
-		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
+
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+		
 		config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, startWebserver);
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
-		return new ForkableFlinkMiniCluster(config);
+		
+		return new ForkableFlinkMiniCluster(config, true, mode);
 	}
 
-	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout)
-			throws Exception {
-		if(logDir != null) {
-			logDir.delete();
+	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+		if (logDir != null) {
+			FileUtils.deleteDirectory(logDir);
 		}
-		if(executor != null) {
+		if (executor != null) {
 			int numUnreleasedBCVars = 0;
 			int numActiveConnections = 0;
 			{
@@ -169,11 +179,11 @@ public class TestBaseUtils {
 	//  Result Checking
 	// --------------------------------------------------------------------------------------------
 
-	public BufferedReader[] getResultReader(String resultPath) throws IOException {
+	public static BufferedReader[] getResultReader(String resultPath) throws IOException {
 		return getResultReader(resultPath, new String[]{}, false);
 	}
 
-	public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
+	public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
 											boolean inOrderOfFiles) throws IOException {
 		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 
@@ -206,12 +216,11 @@ public class TestBaseUtils {
 
 
 
-	public BufferedInputStream[] getResultInputStream(String resultPath) throws
-			IOException {
+	public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
 		return getResultInputStream(resultPath, new String[]{});
 	}
 
-	public BufferedInputStream[] getResultInputStream(String resultPath, String[]
+	public static BufferedInputStream[] getResultInputStream(String resultPath, String[]
 			excludePrefixes) throws IOException {
 		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
@@ -221,37 +230,37 @@ public class TestBaseUtils {
 		return inStreams;
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath) throws
-			IOException {
+	public static void readAllResultLines(List<String> target, String resultPath) throws IOException {
 		readAllResultLines(target, resultPath, new String[]{});
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath, String[]
-			excludePrefixes) throws IOException {
+	public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) 
+			throws IOException {
+		
 		readAllResultLines(target, resultPath, excludePrefixes, false);
 	}
 
-	public void readAllResultLines(List<String> target, String resultPath, String[]
+	public static void readAllResultLines(List<String> target, String resultPath, String[]
 			excludePrefixes, boolean inOrderOfFiles) throws IOException {
 		for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
-			String s = null;
+			String s;
 			while ((s = reader.readLine()) != null) {
 				target.add(s);
 			}
 		}
 	}
 
-	public void compareResultsByLinesInMemory(String expectedResultStr, String
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String
 			resultPath) throws Exception {
 		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
 	}
 
-	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
 											String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 		Arrays.sort(result);
 
 		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
@@ -261,18 +270,18 @@ public class TestBaseUtils {
 		Assert.assertArrayEquals(expected, result);
 	}
 
-	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
 																	String resultPath) throws
 			Exception {
 		compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
 	}
 
-	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
 																	String resultPath, String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, true);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 
 		String[] expected = expectedResultStr.split("\n");
 
@@ -280,7 +289,7 @@ public class TestBaseUtils {
 		Assert.assertArrayEquals(expected, result);
 	}
 
-	public void checkLinesAgainstRegexp(String resultPath, String regexp){
+	public static void checkLinesAgainstRegexp(String resultPath, String regexp){
 		Pattern pattern = Pattern.compile(regexp);
 		Matcher matcher = pattern.matcher("");
 
@@ -301,17 +310,17 @@ public class TestBaseUtils {
 
 	}
 
-	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
 											String delimiter, double maxDelta) throws Exception {
 		compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
 	}
 
-	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+	public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
 											String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
 		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-		String[] result = (String[]) list.toArray(new String[list.size()]);
+		String[] result = list.toArray(new String[list.size()]);
 		String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
 
 		Assert.assertEquals("Wrong number of result lines.", expected.length, result.length);
@@ -330,7 +339,7 @@ public class TestBaseUtils {
 		}
 	}
 
-	public <X> void compareResultCollections(List<X> expected, List<X> actual,
+	public static <X> void compareResultCollections(List<X> expected, List<X> actual,
 											Comparator<X> comparator) {
 		Assert.assertEquals(expected.size(), actual.size());
 
@@ -445,8 +454,8 @@ public class TestBaseUtils {
 	protected static void deleteRecursively(File f) throws IOException {
 		if (f.isDirectory()) {
 			FileUtils.deleteDirectory(f);
-		} else {
-			f.delete();
+		} else if (!f.delete()) {
+			System.err.println("Failed to delete file " + f.getAbsolutePath());
 		}
 	}
 	
@@ -469,13 +478,13 @@ public class TestBaseUtils {
 	// Web utils
 	//---------------------------------------------------------------------------------------------
 
-	public static String getFromHTTP(String url) throws Exception{
+	public static String getFromHTTP(String url) throws Exception {
 		URL u = new URL(url);
 		LOG.info("Accessing URL "+url+" as URL: "+u);
 		HttpURLConnection connection = (HttpURLConnection) u.openConnection();
 		connection.setConnectTimeout(100000);
 		connection.connect();
-		InputStream is = null;
+		InputStream is;
 		if(connection.getResponseCode() >= 400) {
 			// error!
 			LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
@@ -486,5 +495,4 @@ public class TestBaseUtils {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 2e664c1..3ea8624 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util
 
+import org.apache.flink.runtime.StreamingMode
 import org.scalatest.{Suite, BeforeAndAfter}
 
 /** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
@@ -46,17 +47,16 @@ import org.scalatest.{Suite, BeforeAndAfter}
   *          }}}
   *
   */
-trait FlinkTestBase
-  extends BeforeAndAfter {
+trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
   var cluster: Option[ForkableFlinkMiniCluster] = None
   val parallelism = 4
 
   before {
-    val cl = TestBaseUtils.startCluster(1, parallelism, false)
+    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
     val clusterEnvironment = new TestEnvironment(cl, parallelism)
-    clusterEnvironment.setAsContext
+    clusterEnvironment.setAsContext()
 
     cluster = Some(cl)
   }


[2/4] flink git commit: [FLINK-2085] [runtime] Add an option to let the MemoryManager allocate and release memory as needed.

Posted by se...@apache.org.
[FLINK-2085] [runtime] Add an option to let the MemoryManager allocate and release memory as needed.

This is an alternative mode to the current mode that pre-allocates all memory.

The default remains to pre-allocate all memory.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea60678e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea60678e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea60678e

Branch: refs/heads/master
Commit: ea60678e9ecdcf954cddf0ce6d0e0b6da01b4cbc
Parents: 924830f
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 18:35:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200

----------------------------------------------------------------------
 .../memorymanager/DefaultMemoryManager.java     | 138 ++++++++----
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |   2 +-
 .../io/disk/FileChannelStreamsITCase.java       |   2 +-
 .../runtime/io/disk/FileChannelStreamsTest.java |   4 +-
 .../disk/SeekableFileChannelInputViewTest.java  |   2 +-
 .../io/disk/iomanager/IOManagerITCase.java      |   4 +-
 .../IOManagerPerformanceBenchmark.java          |   4 +-
 .../memory/DefaultMemoryManagerTest.java        | 197 -----------------
 .../memory/MemoryManagerLazyAllocationTest.java | 209 +++++++++++++++++++
 .../flink/runtime/memory/MemoryManagerTest.java | 208 ++++++++++++++++++
 .../flink/runtime/memory/MemorySegmentTest.java |   4 +-
 .../operators/drivers/TestTaskContext.java      |   2 +-
 .../NonReusingReOpenableHashTableITCase.java    |  10 +-
 .../hash/ReusingReOpenableHashTableITCase.java  |  10 +-
 .../sort/LargeRecordHandlerITCase.java          |   4 +-
 .../operators/sort/LargeRecordHandlerTest.java  |   6 +-
 ...askManagerComponentsStartupShutdownTest.java |   2 +-
 18 files changed, 539 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 28ebe13..b041ac9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -66,16 +66,21 @@ public class DefaultMemoryManager implements MemoryManager {
 	private final int pageSizeBits;			// the number of bits that the power-of-two page size corresponds to
 	
 	private final int totalNumPages;		// The initial total size, for verification.
-	
-	private boolean isShutDown;				// flag whether the close() has already been invoked.
-
-	/**
-	 * Number of slots of the task manager
-	 */
-	private final int numberOfSlots;
 
+	/** The total size of the memory managed by this memory manager */
 	private final long memorySize;
 
+	/** Number of slots of the task manager */
+	private final int numberOfSlots;
+	
+	private final boolean isPreAllocated;
+	
+	/** The number of memory pages that have not been allocated and are available for lazy allocation */
+	private int numNonAllocatedPages;
+	
+	/** flag whether the close() has already been invoked */
+	private boolean isShutDown;
+
 	// ------------------------------------------------------------------------
 	// Constructors / Destructors
 	// ------------------------------------------------------------------------
@@ -87,7 +92,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * @param numberOfSlots The number of slots of the task manager.
 	 */
 	public DefaultMemoryManager(long memorySize, int numberOfSlots) {
-		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
+		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE, true);
 	}
 
 	/**
@@ -96,8 +101,10 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
 	 * @param numberOfSlots The number of slots of the task manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
+	 * @param preAllocateMemory True, if the memory manaber should immediately allocate all memory, false
+	 *                          if it should allocate and release the memory as needed.
 	 */
-	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
+	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize, boolean preAllocateMemory) {
 		// sanity checks
 		if (memorySize <= 0) {
 			throw new IllegalArgumentException("Size of total memory must be positive.");
@@ -132,11 +139,17 @@ public class DefaultMemoryManager implements MemoryManager {
 		this.freeSegments = new ArrayDeque<byte[]>(this.totalNumPages);
 		this.allocatedSegments = new HashMap<AbstractInvokable, Set<DefaultMemorySegment>>();
 
+		this.isPreAllocated = preAllocateMemory;
 		
-		// add the full chunks
-		for (int i = 0; i < this.totalNumPages; i++) {
-			// allocate memory of the specified size
-			this.freeSegments.add(new byte[this.pageSize]);
+		if (preAllocateMemory) {
+			// add the full chunks
+			for (int i = 0; i < this.totalNumPages; i++) {
+				// allocate memory of the specified size
+				this.freeSegments.add(new byte[this.pageSize]);
+			}
+		}
+		else {
+			this.numNonAllocatedPages = this.totalNumPages;
 		}
 	}
 
@@ -147,12 +160,14 @@ public class DefaultMemoryManager implements MemoryManager {
 		{
 			if (!this.isShutDown) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Shutting down MemoryManager instance " + toString());
+					LOG.debug("Shutting down MemoryManager instance " + this);
 				}
 	
 				// mark as shutdown and release memory
 				this.isShutDown = true;
+				
 				this.freeSegments.clear();
+				this.numNonAllocatedPages = 0;
 				
 				// go over all allocated segments and release them
 				for (Set<DefaultMemorySegment> segments : this.allocatedSegments.values()) {
@@ -173,7 +188,9 @@ public class DefaultMemoryManager implements MemoryManager {
 	@Override
 	public boolean verifyEmpty() {
 		synchronized (this.lock) {
-			return this.freeSegments.size() == this.totalNumPages;
+			return isPreAllocated ?
+					this.freeSegments.size() == this.totalNumPages :
+					this.numNonAllocatedPages == this.totalNumPages;
 		}
 	}
 
@@ -209,7 +226,9 @@ public class DefaultMemoryManager implements MemoryManager {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
 			
-			if (numPages > this.freeSegments.size()) {
+			// in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the
+			// lazy case, the 'freeSegments.size()' is zero.
+			if (numPages > (this.freeSegments.size() + numNonAllocatedPages)) {
 				throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " + 
 					this.freeSegments.size() + " pages are remaining.");
 			}
@@ -220,11 +239,22 @@ public class DefaultMemoryManager implements MemoryManager {
 				this.allocatedSegments.put(owner, segmentsForOwner);
 			}
 			
-			for (int i = numPages; i > 0; i--) {
-				byte[] buffer = this.freeSegments.poll();
-				final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
-				target.add(segment);
-				segmentsForOwner.add(segment);
+			if (isPreAllocated) {
+				for (int i = numPages; i > 0; i--) {
+					byte[] buffer = this.freeSegments.poll();
+					final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+					target.add(segment);
+					segmentsForOwner.add(segment);
+				}
+			}
+			else {
+				for (int i = numPages; i > 0; i--) {
+					byte[] buffer = new byte[pageSize];
+					final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+					target.add(segment);
+					segmentsForOwner.add(segment);
+				}
+				numNonAllocatedPages -= numPages;
 			}
 		}
 		// -------------------- END CRITICAL SECTION -------------------
@@ -259,14 +289,19 @@ public class DefaultMemoryManager implements MemoryManager {
 						this.allocatedSegments.remove(owner);
 					}
 				}
+
+				byte[] buffer = defSeg.destroy();
+				
+				if (isPreAllocated) {
+					// release the memory in any case
+					this.freeSegments.add(buffer);
+				}
+				else {
+					numNonAllocatedPages++;
+				}
 			}
 			catch (Throwable t) {
-				LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
-			}
-			finally {
-				// release the memory in any case
-				byte[] buffer = defSeg.destroy();
-				this.freeSegments.add(buffer);
+				throw new RuntimeException("Error removing book-keeping reference to allocated memory segment.", t);
 			}
 		}
 		// -------------------- END CRITICAL SECTION -------------------
@@ -286,8 +321,7 @@ public class DefaultMemoryManager implements MemoryManager {
 			}
 
 			// since concurrent modifications to the collection
-			// can disturb the release, we need to try potentially
-			// multiple times
+			// can disturb the release, we need to try potentially multiple times
 			boolean successfullyReleased = false;
 			do {
 				final Iterator<T> segmentsIterator = segments.iterator();
@@ -322,12 +356,20 @@ public class DefaultMemoryManager implements MemoryManager {
 									this.allocatedSegments.remove(owner);
 								}
 							}
-						} catch (Throwable t) {
-							LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
-						} finally {
+
 							// release the memory in any case
 							byte[] buffer = defSeg.destroy();
-							this.freeSegments.add(buffer);
+							
+							if (isPreAllocated) {
+								this.freeSegments.add(buffer);
+							}
+							else {
+								numNonAllocatedPages++;
+							}
+						}
+						catch (Throwable t) {
+							throw new RuntimeException(
+									"Error removing book-keeping reference to allocated memory segment.", t);
 						}
 					}
 
@@ -363,9 +405,17 @@ public class DefaultMemoryManager implements MemoryManager {
 			}
 
 			// free each segment
-			for (DefaultMemorySegment seg : segments) {
-				final byte[] buffer = seg.destroy();
-				this.freeSegments.add(buffer);
+			if (isPreAllocated) {
+				for (DefaultMemorySegment seg : segments) {
+					final byte[] buffer = seg.destroy();
+					this.freeSegments.add(buffer);
+				}
+			}
+			else {
+				for (DefaultMemorySegment seg : segments) {
+					seg.destroy();
+				}
+				numNonAllocatedPages += segments.size();
 			}
 
 			segments.clear();
@@ -387,12 +437,16 @@ public class DefaultMemoryManager implements MemoryManager {
 
 	@Override
 	public int computeNumberOfPages(double fraction) {
-		return getRelativeNumPages(fraction);
+		if (fraction <= 0 || fraction > 1) {
+			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
+		}
+
+		return (int)(this.totalNumPages * fraction / this.numberOfSlots);
 	}
 
 	@Override
 	public long computeMemorySize(double fraction) {
-		return this.pageSize*computeNumberOfPages(fraction);
+		return this.pageSize * computeNumberOfPages(fraction);
 	}
 
 	@Override
@@ -414,14 +468,6 @@ public class DefaultMemoryManager implements MemoryManager {
 			throw new IllegalArgumentException("The given number of bytes corresponds to more than MAX_INT pages.");
 		}
 	}
-
-	private int getRelativeNumPages(double fraction){
-		if (fraction <= 0 || fraction > 1) {
-			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
-		}
-
-		return (int)(this.totalNumPages * fraction / this.numberOfSlots);
-	}
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 65ce7dc..a95b5cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1396,8 +1396,10 @@ object TaskManager {
     val memoryManager = try {
       new DefaultMemoryManager(memorySize,
                                taskManagerConfig.numberOfSlots,
-                               netConfig.networkBufferSize)
-    } catch {
+                               netConfig.networkBufferSize,
+                               true)
+    }
+    catch {
       case e: OutOfMemoryError => throw new Exception(
         "OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
           memorySize + " bytes).", e)

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 0462b3f..f2a5e2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -78,7 +78,7 @@ public class ChannelViewsTest
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 85d2113..dcc1e5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -66,7 +66,7 @@ public class FileChannelStreamsITCase {
 
 	@Before
 	public void beforeTest() {
-		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE);
+		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE, true);
 		ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1f6899d..f9b8b38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -44,7 +44,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteOutputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
@@ -78,7 +78,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteInputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index f090ef1..c071bef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -45,7 +45,7 @@ public class SeekableFileChannelInputViewTest {
 		// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
 		
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE);
+			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 9abedb3..a0a8356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 
 /**
@@ -82,7 +82,7 @@ public class IOManagerITCase {
 	@SuppressWarnings("unchecked")
 	public void parallelChannelsTest() throws Exception {
 		final Random rnd = new Random(SEED);
-		final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
+		final AbstractInvokable memOwner = new MemoryManagerTest.DummyInvokable();
 		
 		FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
 		BlockChannelWriter<MemorySegment>[] writers = new BlockChannelWriter[NUM_CHANNELS];

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index c9ca9fb..c71e181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +62,7 @@ public class IOManagerPerformanceBenchmark {
 	private static final int NUM_INTS_WRITTEN = 100000000;
 	
 	
-	private static final AbstractInvokable memoryOwner = new DefaultMemoryManagerTest.DummyInvokable();
+	private static final AbstractInvokable memoryOwner = new MemoryManagerTest.DummyInvokable();
 	
 	private DefaultMemoryManager memManager;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
deleted file mode 100644
index afd40a2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.memory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DefaultMemoryManagerTest
-{
-	private static final long RANDOM_SEED = 643196033469871L;
-
-	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
-
-	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-	
-	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
-
-	private DefaultMemoryManager memoryManager;
-
-	private Random random;
-
-	@Before
-	public void setUp()
-	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
-		this.random = new Random(RANDOM_SEED);
-	}
-
-	@After
-	public void tearDown()
-	{
-		if (!this.memoryManager.verifyEmpty()) {
-			Assert.fail("Memory manager is not complete empty and valid at the end of the test.");
-		}
-		this.memoryManager = null;
-		this.random = null;
-	}
-
-	@Test
-	public void allocateAllSingle() throws Exception
-	{
-		final AbstractInvokable mockInvoke = new DummyInvokable();
-		List<MemorySegment> segments = new ArrayList<MemorySegment>();
-		
-		try {
-			for (int i = 0; i < NUM_PAGES; i++) {
-				segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
-			}
-		} catch (MemoryAllocationException e) {
-			Assert.fail("Unable to allocate memory");
-		}
-		
-		this.memoryManager.release(segments);
-	}
-	
-	@Test
-	public void allocateAllMulti() throws Exception
-	{
-		final AbstractInvokable mockInvoke = new DummyInvokable();
-		final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-		
-		try {
-			for(int i = 0; i < NUM_PAGES / 2; i++) {
-				segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
-			}
-		} catch (MemoryAllocationException e) {
-			Assert.fail("Unable to allocate memory");
-		}
-		
-		this.memoryManager.release(segments);
-	}
-	
-	@Test
-	public void allocateMultipleOwners()
-	{
-		final int NUM_OWNERS = 17;
-	
-		try {
-			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-			@SuppressWarnings("unchecked")
-			List<MemorySegment>[] mems = new List[NUM_OWNERS];
-			
-			for (int i = 0; i < NUM_OWNERS; i++) {
-				owners[i] = new DummyInvokable();
-				mems[i] = new ArrayList<MemorySegment>(64);
-			}
-			
-			// allocate all memory to the different owners
-			for (int i = 0; i < NUM_PAGES; i++) {
-				final int owner = this.random.nextInt(NUM_OWNERS);
-				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
-			}
-			
-			// free one owner at a time
-			for (int i = 0; i < NUM_OWNERS; i++) {
-				this.memoryManager.releaseAll(owners[i]);
-				owners[i] = null;
-				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
-				mems[i] = null;
-				
-				// check that the owner owners were not affected
-				for (int k = i+1; k < NUM_OWNERS; k++) {
-					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void allocateTooMuch()
-	{
-		try {
-			final AbstractInvokable mockInvoke = new DummyInvokable();
-			
-			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-			
-			try {
-				this.memoryManager.allocatePages(mockInvoke, 1);
-				Assert.fail("Expected MemoryAllocationException.");
-			} catch (MemoryAllocationException maex) {
-				// expected
-			}
-			
-			Assert.assertTrue("The previously allocated segments were not valid any more.",
-																	allMemorySegmentsValid(segs));
-			
-			this.memoryManager.releaseAll(mockInvoke);			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an exception: " + e.getMessage());
-		}
-	}
-	
-	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs)
-	{
-		for (MemorySegment seg : memSegs) {
-			if (seg.isFreed()) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs)
-	{
-		for (MemorySegment seg : memSegs) {
-			if (!seg.isFreed()) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	/**
-	 * Utility class to serve as owner for the memory.
-	 */
-	public static final class DummyInvokable extends AbstractInvokable {
-		@Override
-		public void registerInputOutput() {}
-
-		@Override
-		public void invoke() throws Exception {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
new file mode 100644
index 0000000..e077d08
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerLazyAllocationTest {
+	
+	private static final long RANDOM_SEED = 643196033469871L;
+
+	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+	
+	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+	private DefaultMemoryManager memoryManager;
+
+	private Random random;
+
+	
+	@Before
+	public void setUp() {
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, false);
+		this.random = new Random(RANDOM_SEED);
+	}
+
+	@After
+	public void tearDown() {
+		if (!this.memoryManager.verifyEmpty()) {
+			fail("Memory manager is not complete empty and valid at the end of the test.");
+		}
+		this.memoryManager = null;
+		this.random = null;
+	}
+
+	@Test
+	public void allocateAllSingle() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for (int i = 0; i < NUM_PAGES; i++) {
+					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+				}
+			}
+			catch (MemoryAllocationException e) {
+				fail("Unable to allocate memory");
+			}
+			
+			for (MemorySegment seg : segments) {
+				this.memoryManager.release(seg);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateAllMulti() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for(int i = 0; i < NUM_PAGES / 2; i++) {
+					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+				}
+			} catch (MemoryAllocationException e) {
+				Assert.fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateMultipleOwners() {
+		final int NUM_OWNERS = 17;
+	
+		try {
+			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+			
+			@SuppressWarnings("unchecked")
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+			
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				owners[i] = new DummyInvokable();
+				mems[i] = new ArrayList<MemorySegment>(64);
+			}
+			
+			// allocate all memory to the different owners
+			for (int i = 0; i < NUM_PAGES; i++) {
+				final int owner = this.random.nextInt(NUM_OWNERS);
+				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+			}
+			
+			// free one owner at a time
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				this.memoryManager.releaseAll(owners[i]);
+				owners[i] = null;
+				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+				mems[i] = null;
+				
+				// check that the owner owners were not affected
+				for (int k = i+1; k < NUM_OWNERS; k++) {
+					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateTooMuch() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			
+			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+			
+			try {
+				this.memoryManager.allocatePages(mockInvoke, 1);
+				Assert.fail("Expected MemoryAllocationException.");
+			} catch (MemoryAllocationException maex) {
+				// expected
+			}
+			
+			Assert.assertTrue("The previously allocated segments were not valid any more.",
+																	allMemorySegmentsValid(segs));
+			
+			this.memoryManager.releaseAll(mockInvoke);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (!seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	/**
+	 * Utility class to serve as owner for the memory.
+	 */
+	public static final class DummyInvokable extends AbstractInvokable {
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
new file mode 100644
index 0000000..c211357
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerTest {
+	
+	private static final long RANDOM_SEED = 643196033469871L;
+
+	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+	
+	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+	private DefaultMemoryManager memoryManager;
+
+	private Random random;
+
+	
+	@Before
+	public void setUp() {
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
+		this.random = new Random(RANDOM_SEED);
+	}
+
+	@After
+	public void tearDown() {
+		if (!this.memoryManager.verifyEmpty()) {
+			fail("Memory manager is not complete empty and valid at the end of the test.");
+		}
+		this.memoryManager = null;
+		this.random = null;
+	}
+
+	@Test
+	public void allocateAllSingle() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for (int i = 0; i < NUM_PAGES; i++) {
+					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+				}
+			}
+			catch (MemoryAllocationException e) {
+				fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateAllMulti() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for(int i = 0; i < NUM_PAGES / 2; i++) {
+					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+				}
+			} catch (MemoryAllocationException e) {
+				Assert.fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateMultipleOwners() {
+		final int NUM_OWNERS = 17;
+	
+		try {
+			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+			
+			@SuppressWarnings("unchecked")
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+			
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				owners[i] = new DummyInvokable();
+				mems[i] = new ArrayList<MemorySegment>(64);
+			}
+			
+			// allocate all memory to the different owners
+			for (int i = 0; i < NUM_PAGES; i++) {
+				final int owner = this.random.nextInt(NUM_OWNERS);
+				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+			}
+			
+			// free one owner at a time
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				this.memoryManager.releaseAll(owners[i]);
+				owners[i] = null;
+				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+				mems[i] = null;
+				
+				// check that the owner owners were not affected
+				for (int k = i+1; k < NUM_OWNERS; k++) {
+					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateTooMuch() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			
+			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+			
+			try {
+				this.memoryManager.allocatePages(mockInvoke, 1);
+				Assert.fail("Expected MemoryAllocationException.");
+			} catch (MemoryAllocationException maex) {
+				// expected
+			}
+			
+			Assert.assertTrue("The previously allocated segments were not valid any more.",
+																	allMemorySegmentsValid(segs));
+			
+			this.memoryManager.releaseAll(mockInvoke);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (!seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	/**
+	 * Utility class to serve as owner for the memory.
+	 */
+	public static final class DummyInvokable extends AbstractInvokable {
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
index cefd0c5..f9adb94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
@@ -51,8 +51,8 @@ public class MemorySegmentTest {
 	@Before
 	public void setUp() throws Exception{
 		try {
-			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE);
-			this.segment = manager.allocatePages(new DefaultMemoryManagerTest.DummyInvokable(), 1).get(0);
+			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, true);
+			this.segment = manager.allocatePages(new MemoryManagerTest.DummyInvokable(), 1).get(0);
 			this.random = new Random(RANDOM_SEED);
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 02bffec..b1466c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -69,7 +69,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	public TestTaskContext() {}
 	
 	public TestTaskContext(long memoryInBytes) {
-		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024);
+		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 5012d1e..f5105bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -101,10 +101,9 @@ public class NonReusingReOpenableHashTableITCase {
 	private TypeComparator<Record> recordProbeSideComparator;
 	private TypePairComparator<Record, Record> pactRecordComparator;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.recordSerializer = RecordSerializer.get();
 
 		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
@@ -121,13 +120,12 @@ public class NonReusingReOpenableHashTableITCase {
 		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index fd2f906..7172887 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -100,10 +100,9 @@ public class ReusingReOpenableHashTableITCase {
 	private TypeComparator<Record> recordProbeSideComparator;
 	private TypePairComparator<Record, Record> pactRecordComparator;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.recordSerializer = RecordSerializer.get();
 		
 		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
@@ -120,13 +119,12 @@ public class ReusingReOpenableHashTableITCase {
 		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 498cb61..429062b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -62,7 +62,7 @@ public class LargeRecordHandlerITCase {
 		final int NUM_RECORDS = 10;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -203,7 +203,7 @@ public class LargeRecordHandlerITCase {
 		FileIOChannel.ID channel = null;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index 6eb736f..423ff9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -49,7 +49,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_PAGES = 50;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
 			
@@ -101,7 +101,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -189,7 +189,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index dca3c58..7d4994d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -85,7 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 
-			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE);
+			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
 			final int numberOfSlots = 1;