You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 12:46:05 UTC

[1/6] flink git commit: [hotfix] Add retrieval of key sets to DualKeyMap

Repository: flink
Updated Branches:
  refs/heads/master 51a278778 -> 4afd445b3


[hotfix] Add retrieval of key sets to DualKeyMap


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff670946
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff670946
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff670946

Branch: refs/heads/master
Commit: ff67094671e51679ea79e8e518f18348923feb06
Parents: 057edf9
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 14:11:20 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 12:30:30 2018 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/slotpool/DualKeyMap.java  |  9 +++
 .../jobmaster/slotpool/DualKeyMapTest.java      | 60 ++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff670946/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
index 04b3ca6..38bcd49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
@@ -24,6 +24,7 @@ import java.util.AbstractCollection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Set;
 
 /**
  * Map which stores values under two different indices.
@@ -125,6 +126,14 @@ public class DualKeyMap<A, B, V> {
 		return vs;
 	}
 
+	public Set<A> keySetA() {
+		return aMap.keySet();
+	}
+
+	public Set<B> keySetB() {
+		return bMap.keySet();
+	}
+
 	public void clear() {
 		aMap.clear();
 		bMap.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/ff670946/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
new file mode 100644
index 0000000..1500d24
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link DualKeyMap}.
+ */
+public class DualKeyMapTest extends TestLogger {
+
+	@Test
+	public void testKeySets() {
+		final Random random = new Random();
+		final int capacity = 10;
+		final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity);
+
+		for (int i = 0; i < capacity; i++) {
+			int keyA = random.nextInt();
+			int keyB = random.nextInt();
+			keys.add(Tuple2.of(keyA, keyB));
+		}
+
+		final DualKeyMap<Integer, Integer, String> dualKeyMap = new DualKeyMap<>(capacity);
+
+		for (Tuple2<Integer, Integer> key : keys) {
+			dualKeyMap.put(key.f0, key.f1, "foobar");
+		}
+
+		assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet())));
+		assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet())));
+	}
+}


[2/6] flink git commit: [FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor

Posted by tr...@apache.org.
[FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor

This commit introduces the JobExecutor interface which abstracts the actual mini cluster
from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well as the
FlinkMiniCluster implement this interface, we can run all test base jobs either on the
Flip-6 mini cluster or on the current mini cluster.

This closes #4897.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/057edf9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/057edf9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/057edf9e

Branch: refs/heads/master
Commit: 057edf9e28b656401b985069ebcc428ab55e5fed
Parents: 51a2787
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 8 18:23:27 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 12:30:30 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/JobExecutor.java  | 38 ++++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  5 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  | 14 +++++++-
 .../runtime/minicluster/MiniClusterITCase.java  |  2 +-
 .../Flip6LocalStreamEnvironment.java            |  2 +-
 .../streaming/util/TestStreamEnvironment.java   | 31 ++++++++--------
 .../apache/flink/test/util/TestEnvironment.java | 36 ++++++++++---------
 7 files changed, 91 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
new file mode 100644
index 0000000..4991c2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Interface for {@link JobGraph} executors.
+ */
+public interface JobExecutor {
+
+	/**
+	 * Run the given job and block until its execution result can be returned.
+	 *
+	 * @param jobGraph to execute
+	 * @return Execution result of the executed job
+	 * @throws JobExecutionException if the job failed to execute
+	 */
+	JobExecutionResult executeJobBlocking(final JobGraph jobGraph) throws JobExecutionException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2bbd2c7..2598a60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-public class MiniCluster {
+public class MiniCluster implements JobExecutor {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
 
@@ -448,7 +448,8 @@ public class MiniCluster {
 	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
 	 *         or if the job terminally failed.
 	 */
-	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+	@Override
+	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
 		MiniClusterJobDispatcher dispatcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5554061..cc8ae5f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -64,7 +64,8 @@ abstract class FlinkMiniCluster(
     val userConfiguration: Configuration,
     val highAvailabilityServices: HighAvailabilityServices,
     val useSingleActorSystem: Boolean)
-  extends LeaderRetrievalListener {
+  extends LeaderRetrievalListener
+  with JobExecutor {
 
   protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
 
@@ -701,4 +702,15 @@ abstract class FlinkMiniCluster(
 
     FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader)
   }
+
+  /**
+    * Run the given job and block until its execution result can be returned.
+    *
+    * @param jobGraph to execute
+    * @return Execution result of the executed job
+    * @throws JobExecutionException if the job failed to execute
+    */
+  override def executeJobBlocking(jobGraph: JobGraph) = {
+    submitJobAndWait(jobGraph, false)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f822e0c..6a4f678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -99,7 +99,7 @@ public class MiniClusterITCase extends TestLogger {
 
 	private static void executeJob(MiniCluster miniCluster) throws Exception {
 		JobGraph job = getSimpleJob();
-		miniCluster.runJobBlocking(job);
+		miniCluster.executeJobBlocking(job);
 	}
 
 	private static JobGraph getSimpleJob() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 8720e7a..9d4f2a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -107,7 +107,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		try {
 			miniCluster.start();
-			return miniCluster.runJobBlocking(jobGraph);
+			return miniCluster.executeJobBlocking(jobGraph);
 		}
 		finally {
 			transformations.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 86159a5..61615b8 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.JobExecutor;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
@@ -37,20 +38,20 @@ import java.util.Collections;
  */
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
-	/** The mini cluster in which this environment executes its jobs. */
-	private final LocalFlinkMiniCluster miniCluster;
+	/** The job executor to use to execute environment's jobs. */
+	private final JobExecutor jobExecutor;
 
 	private final Collection<Path> jarFiles;
 
 	private final Collection<URL> classPaths;
 
 	public TestStreamEnvironment(
-			LocalFlinkMiniCluster miniCluster,
+			JobExecutor jobExecutor,
 			int parallelism,
 			Collection<Path> jarFiles,
 			Collection<URL> classPaths) {
 
-		this.miniCluster = Preconditions.checkNotNull(miniCluster);
+		this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
 		this.jarFiles = Preconditions.checkNotNull(jarFiles);
 		this.classPaths = Preconditions.checkNotNull(classPaths);
 
@@ -58,9 +59,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	public TestStreamEnvironment(
-			LocalFlinkMiniCluster miniCluster,
+			JobExecutor jobExecutor,
 			int parallelism) {
-		this(miniCluster, parallelism, Collections.<Path>emptyList(), Collections.<URL>emptyList());
+		this(jobExecutor, parallelism, Collections.emptyList(), Collections.emptyList());
 	}
 
 	@Override
@@ -75,7 +76,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
 		jobGraph.setClasspaths(new ArrayList<>(classPaths));
 
-		return miniCluster.submitJobAndWait(jobGraph, false);
+		return jobExecutor.executeJobBlocking(jobGraph);
 	}
 
 	// ------------------------------------------------------------------------
@@ -85,13 +86,13 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	 * the given cluster with the given default parallelism and the specified jar files and class
 	 * paths.
 	 *
-	 * @param cluster The test cluster to run the test program on.
+	 * @param jobExecutor The executor to execute the jobs on
 	 * @param parallelism The default parallelism for the test programs.
 	 * @param jarFiles Additional jar files to execute the job with
 	 * @param classpaths Additional class paths to execute the job with
 	 */
 	public static void setAsContext(
-			final LocalFlinkMiniCluster cluster,
+			final JobExecutor jobExecutor,
 			final int parallelism,
 			final Collection<Path> jarFiles,
 			final Collection<URL> classpaths) {
@@ -100,7 +101,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 			@Override
 			public StreamExecutionEnvironment createExecutionEnvironment() {
 				return new TestStreamEnvironment(
-					cluster,
+					jobExecutor,
 					parallelism,
 					jarFiles,
 					classpaths);
@@ -114,15 +115,15 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
 	 * the given cluster with the given default parallelism.
 	 *
-	 * @param cluster The test cluster to run the test program on.
+	 * @param jobExecutor The executor to execute the jobs on
 	 * @param parallelism The default parallelism for the test programs.
 	 */
-	public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
+	public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
 		setAsContext(
-			cluster,
+			jobExecutor,
 			parallelism,
-			Collections.<Path>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index ac5b17d..1d82f87 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -30,6 +31,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.JobExecutor;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Preconditions;
 
@@ -44,7 +46,7 @@ import java.util.Collections;
  */
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final LocalFlinkMiniCluster miniCluster;
+	private final JobExecutor jobExecutor;
 
 	private final Collection<Path> jarFiles;
 
@@ -53,12 +55,12 @@ public class TestEnvironment extends ExecutionEnvironment {
 	private TestEnvironment lastEnv;
 
 	public TestEnvironment(
-			LocalFlinkMiniCluster miniCluster,
+			JobExecutor jobExecutor,
 			int parallelism,
 			boolean isObjectReuseEnabled,
 			Collection<Path> jarFiles,
 			Collection<URL> classPaths) {
-		this.miniCluster = Preconditions.checkNotNull(miniCluster);
+		this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
 		this.jarFiles = Preconditions.checkNotNull(jarFiles);
 		this.classPaths = Preconditions.checkNotNull(classPaths);
 
@@ -77,15 +79,15 @@ public class TestEnvironment extends ExecutionEnvironment {
 	}
 
 	public TestEnvironment(
-			LocalFlinkMiniCluster executor,
+			JobExecutor executor,
 			int parallelism,
 			boolean isObjectReuseEnabled) {
 		this(
 			executor,
 			parallelism,
 			isObjectReuseEnabled,
-			Collections.<Path>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 	}
 
 	@Override
@@ -115,7 +117,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 
 		jobGraph.setClasspaths(new ArrayList<>(classPaths));
 
-		this.lastJobExecutionResult = miniCluster.submitJobAndWait(jobGraph, false);
+		this.lastJobExecutionResult = jobExecutor.executeJobBlocking(jobGraph);
 		return this.lastJobExecutionResult;
 	}
 
@@ -130,7 +132,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 	private OptimizedPlan compileProgram(String jobName) {
 		Plan p = createProgramPlan(jobName);
 
-		Optimizer pc = new Optimizer(new DataStatistics(), this.miniCluster.configuration());
+		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
 		return pc.compile(p);
 	}
 
@@ -138,7 +140,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {
-				lastEnv = new TestEnvironment(miniCluster, getParallelism(), getConfig().isObjectReuseEnabled());
+				lastEnv = new TestEnvironment(jobExecutor, getParallelism(), getConfig().isObjectReuseEnabled());
 				return lastEnv;
 			}
 		};
@@ -153,13 +155,13 @@ public class TestEnvironment extends ExecutionEnvironment {
 	 * environment executes the given jobs on a Flink mini cluster with the given default
 	 * parallelism and the additional jar files and class paths.
 	 *
-	 * @param miniCluster The mini cluster on which to execute the jobs
+	 * @param jobExecutor The executor to run the jobs on
 	 * @param parallelism The default parallelism
 	 * @param jarFiles Additional jar files to execute the job with
 	 * @param classPaths Additional class paths to execute the job with
 	 */
 	public static void setAsContext(
-		final LocalFlinkMiniCluster miniCluster,
+		final JobExecutor jobExecutor,
 		final int parallelism,
 		final Collection<Path> jarFiles,
 		final Collection<URL> classPaths) {
@@ -168,7 +170,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {
 				return new TestEnvironment(
-					miniCluster,
+					jobExecutor,
 					parallelism,
 					false,
 					jarFiles,
@@ -185,15 +187,15 @@ public class TestEnvironment extends ExecutionEnvironment {
 	 * environment executes the given jobs on a Flink mini cluster with the given default
 	 * parallelism and the additional jar files and class paths.
 	 *
-	 * @param miniCluster The mini cluster on which to execute the jobs
+	 * @param jobExecutor The executor to run the jobs on
 	 * @param parallelism The default parallelism
 	 */
-	public static void setAsContext(final LocalFlinkMiniCluster miniCluster, final int parallelism) {
+	public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
 		setAsContext(
-			miniCluster,
+			jobExecutor,
 			parallelism,
-			Collections.<Path>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 	}
 
 	public static void unsetAsContext() {


[6/6] flink git commit: [hotfix] Refactor JobMasterTest to avoid using Mockito

Posted by tr...@apache.org.
[hotfix] Refactor JobMasterTest to avoid using Mockito


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4afd445b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4afd445b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4afd445b

Branch: refs/heads/master
Commit: 4afd445b36f3218d23e1c44e73868a4b12c9be52
Parents: 91f2a8d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 16:44:59 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 140 +++++++++++--------
 .../utils/TestingResourceManagerGateway.java    |  23 +++
 .../TestingTaskExecutorGateway.java             |  26 +++-
 3 files changed, 127 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 1f94419..e4f9fc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,17 +18,19 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -37,32 +39,27 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.net.InetAddress;
 import java.net.URL;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-import static org.mockito.Mockito.*;
+import static org.hamcrest.MatcherAssert.assertThat;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(BlobLibraryCacheManager.class)
 @Category(Flip6.class)
 public class JobMasterTest extends TestLogger {
 
@@ -75,7 +72,7 @@ public class JobMasterTest extends TestLogger {
 			null,
 			null);
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final String jobManagerAddress = "jm";
@@ -83,9 +80,14 @@ public class JobMasterTest extends TestLogger {
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 
 		final String taskManagerAddress = "tm";
-		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
-		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+
+		final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
+		final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+
+		taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
+		taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
 
 		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
@@ -93,7 +95,7 @@ public class JobMasterTest extends TestLogger {
 		final long heartbeatInterval = 1L;
 		final long heartbeatTimeout = 5L;
 
-		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor();
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
 		final JobGraph jobGraph = new JobGraph();
@@ -115,10 +117,10 @@ public class JobMasterTest extends TestLogger {
 					blobServer,
 					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 					new String[0]),
-				mock(RestartStrategyFactory.class),
+				new NoRestartStrategy.NoRestartStrategyFactory(),
 				testingTimeout,
 				null,
-				mock(OnCompletionActions.class),
+				new NoOpOnCompletionActions(),
 				testingFatalErrorHandler,
 				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
 				null,
@@ -138,29 +140,15 @@ public class JobMasterTest extends TestLogger {
 			// wait for the completion of the registration
 			registrationResponse.get();
 
-			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
-				heartbeatRunnableCaptor.capture(),
-				eq(0L),
-				eq(heartbeatInterval),
-				eq(TimeUnit.MILLISECONDS));
-
-			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
-
-			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(scheduledExecutor, timeout(testingTimeout.toMilliseconds())).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
-
-			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
+			System.out.println("foobar");
 
-			// run the first heartbeat request
-			heartbeatRunnable.run();
+			final ResourceID heartbeatResourceId = heartbeatResourceIdFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-			verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId));
+			assertThat(heartbeatResourceId, Matchers.equalTo(jmResourceId));
 
-			// run the timeout runnable to simulate a heartbeat timeout
-			timeoutRunnable.run();
+			final JobID disconnectedJobManager = disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-			verify(taskExecutorGateway, timeout(testingTimeout.toMilliseconds())).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+			assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -185,24 +173,33 @@ public class JobMasterTest extends TestLogger {
 			null,
 			null);
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+		final TestingRpcService rpc = new TestingRpcService();
 
 		final long heartbeatInterval = 1L;
 		final long heartbeatTimeout = 5L;
-		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor();
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
-		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway.registerJobManager(
-			any(JobMasterId.class),
-			any(ResourceID.class),
-			anyString(),
-			any(JobID.class),
-			any(Time.class)
-		)).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
-			heartbeatInterval, resourceManagerId, rmResourceId)));
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(
+			resourceManagerId,
+			rmResourceId,
+			heartbeatInterval,
+			"localhost",
+			"localhost");
+
+		final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> jobManagerRegistrationFuture = new CompletableFuture<>();
+		final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+
+		resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> jobManagerRegistrationFuture.complete(
+			Tuple3.of(
+				tuple.f0,
+				tuple.f1,
+				tuple.f3)));
+
+		resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
 
-		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -224,10 +221,10 @@ public class JobMasterTest extends TestLogger {
 					blobServer,
 					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 					new String[0]),
-				mock(RestartStrategyFactory.class),
+				new NoRestartStrategy.NoRestartStrategyFactory(),
 				testingTimeout,
 				null,
-				mock(OnCompletionActions.class),
+				new NoOpOnCompletionActions(),
 				testingFatalErrorHandler,
 				FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
 				null,
@@ -242,15 +239,18 @@ public class JobMasterTest extends TestLogger {
 			rmLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerId.toUUID());
 
 			// register job manager success will trigger monitor heartbeat target between jm and rm
-			verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager(
-				eq(jobMasterId),
-				eq(jmResourceId),
-				anyString(),
-				eq(jobGraph.getJobID()),
-				any(Time.class));
+			final Tuple3<JobMasterId, ResourceID, JobID> registrationInformation = jobManagerRegistrationFuture.get(
+				testingTimeout.toMilliseconds(),
+				TimeUnit.MILLISECONDS);
+
+			assertThat(registrationInformation.f0, Matchers.equalTo(jobMasterId));
+			assertThat(registrationInformation.f1, Matchers.equalTo(jmResourceId));
+			assertThat(registrationInformation.f2, Matchers.equalTo(jobGraph.getJobID()));
+
+			final JobID disconnectedJobManager = disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			// heartbeat timeout should trigger disconnect JobManager from ResourceManager
-			verify(resourceManagerGateway, timeout(heartbeatTimeout * 50L)).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+			assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -260,5 +260,25 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * No op implementation of {@link OnCompletionActions}.
+	 */
+	private static final class NoOpOnCompletionActions implements OnCompletionActions {
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 0f8b5fa..dc1635a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -68,6 +69,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private volatile Consumer<SlotRequest> requestSlotConsumer;
 
+	private volatile Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> registerJobManagerConsumer;
+
+	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+
 	public TestingResourceManagerGateway() {
 		this(
 			ResourceManagerId.generate(),
@@ -105,8 +110,22 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.requestSlotConsumer = slotRequestConsumer;
 	}
 
+	public void setRegisterJobManagerConsumer(Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> registerJobManagerConsumer) {
+		this.registerJobManagerConsumer = registerJobManagerConsumer;
+	}
+
+	public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
+		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
+		final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
+
+		if (currentConsumer != null) {
+			currentConsumer.accept(Tuple4.of(jobMasterId, jobMasterResourceId, jobMasterAddress, jobId));
+		}
+
 		return CompletableFuture.completedFuture(
 			new JobMasterRegistrationSuccess(
 				heartbeatInterval,
@@ -191,7 +210,11 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
+		final Consumer<Tuple2<JobID, Throwable>> currentConsumer = disconnectJobManagerConsumer;
 
+		if (currentConsumer != null) {
+			currentConsumer.accept(Tuple2.of(jobId, cause));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index c6334c5..aa6c872 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Simple {@link TaskExecutorGateway} implementation for testing purposes.
@@ -43,6 +45,10 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final String hostname;
 
+	private volatile Consumer<ResourceID> heartbeatJobManagerConsumer;
+
+	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+
 	public TestingTaskExecutorGateway() {
 		this("foobar:1234", "foobar");
 	}
@@ -52,6 +58,14 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 		this.hostname = Preconditions.checkNotNull(hostname);
 	}
 
+	public void setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
+		this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+	}
+
+	public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
+		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+	}
+
 	@Override
 	public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
 		return CompletableFuture.completedFuture(Acknowledge.get());
@@ -94,7 +108,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
-		// noop
+		final Consumer<ResourceID> currentHeartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+
+		if (currentHeartbeatJobManagerConsumer != null) {
+			currentHeartbeatJobManagerConsumer.accept(heartbeatOrigin);
+		}
 	}
 
 	@Override
@@ -104,7 +122,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
-		// noop
+		final Consumer<Tuple2<JobID, Throwable>> currentDisconnectJobManagerConsumer = disconnectJobManagerConsumer;
+
+		if (currentDisconnectJobManagerConsumer != null) {
+			currentDisconnectJobManagerConsumer.accept(Tuple2.of(jobId, cause));
+		}
 	}
 
 	@Override


[4/6] flink git commit: [hotfix] Enable checkpointing RPC calls

Posted by tr...@apache.org.
[hotfix] Enable checkpointing RPC calls


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79854697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79854697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79854697

Branch: refs/heads/master
Commit: 79854697d130b9650b2aa679d263b239a2c10521
Parents: 9541afd
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 15:10:46 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/RpcTaskManagerGateway.java | 13 ++++++++-----
 .../runtime/taskexecutor/TaskExecutorGateway.java      |  4 ++--
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79854697/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 83b8999..f1d991d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
@@ -37,7 +38,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Implementation of the {@link TaskManagerGateway} for Flink's RPC system
+ * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
  */
 public class RpcTaskManagerGateway implements TaskManagerGateway {
 
@@ -112,14 +113,16 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
-//		taskExecutorGateway.notifyCheckpointComplete(executionAttemptID, jobId, checkpointId, timestamp);
-		throw new UnsupportedOperationException("Operation is not yet supported.");
+		taskExecutorGateway.confirmCheckpoint(executionAttemptID, checkpointId, timestamp);
 	}
 
 	@Override
 	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-//		taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
-		throw new UnsupportedOperationException("Operation is not yet supported.");
+		taskExecutorGateway.triggerCheckpoint(
+			executionAttemptID,
+			checkpointId,
+			timestamp,
+			checkpointOptions);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79854697/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 3dc80b6..482d6fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -37,12 +37,12 @@ import org.apache.flink.runtime.taskmanager.Task;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link TaskExecutor} RPC gateway interface
+ * {@link TaskExecutor} RPC gateway interface.
  */
 public interface TaskExecutorGateway extends RpcGateway {
 
 	/**
-	 * Requests a slot from the TaskManager
+	 * Requests a slot from the TaskManager.
 	 *
 	 * @param slotId slot id for the request
 	 * @param jobId for which to request a slot


[5/6] flink git commit: [hotfix] Add JavaDocs to OnCompletionActions

Posted by tr...@apache.org.
[hotfix] Add JavaDocs to OnCompletionActions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f2a8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f2a8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f2a8d9

Branch: refs/heads/master
Commit: 91f2a8d9b6bafae5855254977d1f38e1b89f2389
Parents: 7985469
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 09:28:34 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/OnCompletionActions.java    | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91f2a8d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 25a2a66..17167f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -20,11 +20,28 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobExecutionResult;
 
+/**
+ * Interface for completion actions once a Flink job has reached
+ * a terminal state.
+ */
 public interface OnCompletionActions {
 
+	/**
+	 * Job finished successfully.
+	 *
+	 * @param result of the job execution
+	 */
 	void jobFinished(JobExecutionResult result);
 
+	/**
+	 * Job failed with the given exception.
+	 *
+	 * @param cause of the job failure
+	 */
 	void jobFailed(Throwable cause);
 
+	/**
+	 * Job was finished by another JobMaster.
+	 */
 	void jobFinishedByOther();
 }


[3/6] flink git commit: [FLINK-8389] [flip6] Release all slots upon closing of JobManager

Posted by tr...@apache.org.
[FLINK-8389] [flip6] Release all slots upon closing of JobManager

This closes #5265.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9541afd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9541afd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9541afd2

Branch: refs/heads/master
Commit: 9541afd2c53fc55cee7f0f45b4e16377803f1388
Parents: ff67094
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 15:02:09 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:19 2018 +0100

----------------------------------------------------------------------
 .../slots/ActorTaskManagerGateway.java          |   6 +
 .../jobmanager/slots/TaskManagerGateway.java    |  15 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  90 +++++++----
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   9 ++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 126 ++++++++++-----
 .../jobmaster/slotpool/SlotPoolGateway.java     |   4 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 137 +++++++----------
 .../taskexecutor/TaskExecutorGateway.java       |  13 ++
 .../runtime/taskexecutor/slot/TaskSlot.java     |  30 ++--
 .../taskexecutor/slot/TaskSlotTable.java        |  26 ++--
 .../runtime/taskexecutor/slot/TimerService.java |   9 +-
 .../ExecutionGraphRestartTest.java              |  14 +-
 .../utils/SimpleAckingTaskManagerGateway.java   |  39 +++--
 .../scheduler/DummyScheduledUnit.java           |  34 +++++
 .../jobmaster/slotpool/SlotPoolTest.java        | 152 +++++++++++++------
 .../TestingTaskExecutorGateway.java             |   5 +
 .../src/test/resources/log4j-test.properties    |  38 +++++
 19 files changed, 498 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index dc5d8c0..6b88752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -218,6 +219,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		throw new UnsupportedOperationException("The old TaskManager does not support freeing slots");
+	}
+
 	private CompletableFuture<TransientBlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
 		Preconditions.checkNotNull(request);
 		Preconditions.checkNotNull(timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 682441a..b2aca32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -186,4 +188,17 @@ public interface TaskManagerGateway {
 	 * @return Future blob key under which the task manager stdout file has been stored
 	 */
 	CompletableFuture<TransientBlobKey> requestTaskManagerStdout(final Time timeout);
+
+	/**
+	 * Frees the slot with the given allocation ID.
+	 *
+	 * @param allocationId identifying the slot to free
+	 * @param cause of the freeing operation
+	 * @param timeout for the operation
+	 * @return Future acknowledge which is returned once the slot has been freed
+	 */
+	CompletableFuture<Acknowledge> freeSlot(
+		final AllocationID allocationId,
+		final Throwable cause,
+		@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7a2844d..b81a8c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,8 +55,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -65,6 +63,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -110,13 +111,16 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,8 +144,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// ------------------------------------------------------------------------
 
-	private final JobMasterGateway selfGateway;
-
 	private final ResourceID resourceId;
 
 	/** Logical representation of the job. */
@@ -226,9 +228,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			@Nullable String restAddress,
 			@Nullable String metricQueryServicePath) throws Exception {
 
-		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
 
-		selfGateway = getSelfGateway(JobMasterGateway.class);
+		final JobMasterGateway selfGateway = getSelfGateway(JobMasterGateway.class);
 
 		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);
@@ -362,13 +364,47 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 */
 	@Override
 	public void postStop() throws Exception {
+		log.info("Stopping the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
+
+		// disconnect from all registered TaskExecutors
+		final Set<ResourceID> taskManagerResourceIds = new HashSet<>(registeredTaskManagers.keySet());
+		final FlinkException cause = new FlinkException("Stopping JobMaster for job " + jobGraph.getName() +
+			'(' + jobGraph.getJobID() + ").");
+
+		for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
+			disconnectTaskManager(taskManagerResourceId, cause);
+		}
+
 		taskManagerHeartbeatManager.stop();
 		resourceManagerHeartbeatManager.stop();
 
 		// make sure there is a graceful exit
 		suspendExecution(new Exception("JobManager is shutting down."));
 
-		super.postStop();
+		// shut down will internally release all registered slots
+		slotPool.shutDown();
+		CompletableFuture<Boolean> terminationFuture = slotPool.getTerminationFuture();
+
+		Exception exception = null;
+
+		// wait for the slot pool shut down
+		try {
+			terminationFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			exception = e;
+		}
+
+		try {
+			super.postStop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+
+		log.info("Stopped the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -507,9 +543,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
+	public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
 		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-		slotPoolGateway.releaseTaskManager(resourceID);
+		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
 
@@ -517,6 +553,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
 		}
 
+		return releaseFuture;
 	}
 
 	// TODO: This method needs a leader session ID
@@ -537,14 +574,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			checkpointState);
 
 		if (checkpointCoordinator != null) {
-			getRpcService().execute(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
-					} catch (Throwable t) {
-						log.warn("Error while processing checkpoint acknowledgement message");
-					}
+			getRpcService().execute(() -> {
+				try {
+					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+				} catch (Throwable t) {
+					log.warn("Error while processing checkpoint acknowledgement message");
 				}
 			});
 		} else {
@@ -1138,12 +1172,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					resourceManagerResourceID = success.getResourceManagerResourceId();
-					establishResourceManagerConnection(success);
-				}
+			runAsync(() -> {
+				resourceManagerResourceID = success.getResourceManagerResourceId();
+				establishResourceManagerConnection(success);
 			});
 		}
 
@@ -1209,15 +1240,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+			runAsync(() -> {
+				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-				}
+				closeResourceManagerConnection(
+					new TimeoutException(
+						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 09d995e..0d896ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -131,8 +131,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
 	 * @param cause for the disconnection of the TaskManager
+	 * @return Future acknowledge once the JobMaster has been disconnected from the TaskManager
 	 */
-	void disconnectTaskManager(ResourceID resourceID, Exception cause);
+	CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause);
 
 	/**
 	 * Disconnects the resource manager from the job manager because of the given cause.

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 849a163..83b8999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -132,4 +133,12 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 //		return taskExecutorGateway.requestTaskManagerStdout(timeout);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		return taskExecutorGateway.freeSlot(
+			allocationId,
+			cause,
+			timeout);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 996e445..a56335c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -51,9 +51,6 @@ import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.util.Collection;
@@ -89,16 +86,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
 
-	/** The log for the pool - shared also with the internal classes. */
-	static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
-
 	// ------------------------------------------------------------------------
 
 	private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
 
 	private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
 
-	private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+	private static final Time DEFAULT_TIMEOUT = Time.seconds(10);
 
 	// ------------------------------------------------------------------------
 
@@ -121,8 +115,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
-	/** Timeout for request calls to the ResourceManager. */
-	private final Time resourceManagerRequestsTimeout;
+	/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
+	private final Time timeout;
 
 	/** Timeout for allocation round trips (RM -> launch TM -> offer slot). */
 	private final Time resourceManagerAllocationTimeout;
@@ -143,8 +137,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	// ------------------------------------------------------------------------
 
 	public SlotPool(RpcService rpcService, JobID jobId) {
-		this(rpcService, jobId, SystemClock.getInstance(),
-				DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+		this(
+			rpcService,
+			jobId,
+			SystemClock.getInstance(),
+			DEFAULT_SLOT_REQUEST_TIMEOUT,
+			DEFAULT_RM_ALLOCATION_TIMEOUT,
+			DEFAULT_TIMEOUT);
 	}
 
 	public SlotPool(
@@ -159,10 +158,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		this.jobId = checkNotNull(jobId);
 		this.clock = checkNotNull(clock);
-		this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+		this.timeout = checkNotNull(resourceManagerRequestTimeout);
 		this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
 
-		this.registeredTaskManagers = new HashSet<>();
+		this.registeredTaskManagers = new HashSet<>(16);
 		this.allocatedSlots = new AllocatedSlots();
 		this.availableSlots = new AvailableSlots();
 		this.pendingRequests = new DualKeyMap<>(16);
@@ -204,6 +203,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		}
 	}
 
+	@Override
+	public void postStop() throws Exception {
+		// cancel all pending allocations
+		Set<AllocationID> allocationIds = pendingRequests.keySetB();
+
+		for (AllocationID allocationId : allocationIds) {
+			resourceManagerGateway.cancelSlotRequest(allocationId);
+		}
+
+		// release all registered slots by releasing the corresponding TaskExecutors
+		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
+			releaseTaskManagerInternal(taskManagerResourceId);
+		}
+
+		clear();
+
+		super.postStop();
+	}
+
 	/**
 	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
 	 */
@@ -220,9 +238,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		// Clear (but not release!) the available slots. The TaskManagers should re-register them
 		// at the new leader JobManager/SlotPool
-		availableSlots.clear();
-		allocatedSlots.clear();
-		pendingRequests.clear();
+		clear();
 	}
 
 	// ------------------------------------------------------------------------
@@ -644,7 +660,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		Preconditions.checkNotNull(resourceManagerGateway);
 		Preconditions.checkNotNull(pendingRequest);
 
-		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+		log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
 
 		final AllocationID allocationId = new AllocationID();
 
@@ -660,7 +676,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
 			jobMasterId,
 			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
-			resourceManagerRequestsTimeout);
+			timeout);
 
 		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
 			(Acknowledge value) -> {
@@ -695,8 +711,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+			if (log.isDebugEnabled()) {
+				log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
 			}
 		}
 	}
@@ -710,7 +726,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
-		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+		log.info("Cannot serve slot request, no ResourceManager connected. " +
 				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
 
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
@@ -720,7 +736,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			public void run() {
 				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
 			}
-		}, resourceManagerRequestsTimeout);
+		}, timeout);
 	}
 
 	private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) {
@@ -802,7 +818,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		Preconditions.checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+			log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -833,13 +849,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
 		if (pendingRequest != null) {
-			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+			log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
 				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
 			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
 			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		} else {
-			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+			log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
 			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 	}
@@ -932,14 +948,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final AllocationID allocationID = slotOffer.getAllocationId();
 
 		if (!registeredTaskManagers.contains(resourceID)) {
-			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+			log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
 					slotOffer.getAllocationId(), taskManagerLocation);
 			return CompletableFuture.completedFuture(false);
 		}
 
 		// check whether we have already using this slot
 		if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
-			LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
+			log.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
 
 			// return true here so that the sender will get a positive acknowledgement to the retry
 			// and mark the offering as a success
@@ -1003,7 +1019,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			failPendingRequest(pendingRequest, cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
-			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+			log.debug("Failed available slot [{}] with ", allocationID, cause);
 		}
 		else {
 			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1013,7 +1029,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+				log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1041,21 +1057,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
 	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
 	 *
-	 * @param resourceID The id of the TaskManager
+	 * @param resourceId The id of the TaskManager
 	 */
 	@Override
-	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
-		if (registeredTaskManagers.remove(resourceID)) {
-			availableSlots.removeAllForTaskManager(resourceID);
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+		if (registeredTaskManagers.remove(resourceId)) {
+			releaseTaskManagerInternal(resourceId);
+		}
 
-			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
 
-			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
-				allocatedSlot.releasePayload(new FlinkException("TaskManager " + resourceID + " was released."));
-			}
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private void releaseTaskManagerInternal(final ResourceID resourceId) {
+		final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
+
+		final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
+
+		for (AllocatedSlot allocatedSlot : removedSlots) {
+			allocatedSlot.releasePayload(cause);
 		}
 
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		removedSlots.addAll(availableSlots.removeAllForTaskManager(resourceId));
+
+		for (AllocatedSlot removedSlot : removedSlots) {
+			TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway();
+			taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, timeout);
+		}
+	}
+
+	/**
+	 * Clear the internal state of the SlotPool.
+	 */
+	private void clear() {
+		availableSlots.clear();
+		allocatedSlots.clear();
+		pendingRequests.clear();
+		waitingForResourceManager.clear();
+		registeredTaskManagers.clear();
+		slotSharingManagers.clear();
 	}
 
 	// ------------------------------------------------------------------------
@@ -1363,8 +1406,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		 * Remove all available slots come from specified TaskManager.
 		 *
 		 * @param taskManager The id of the TaskManager
+		 * @return The set of removed slots for the given TaskManager
 		 */
-		void removeAllForTaskManager(final ResourceID taskManager) {
+		Set<AllocatedSlot> removeAllForTaskManager(final ResourceID taskManager) {
 			// remove from the by-TaskManager view
 			final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
 
@@ -1381,6 +1425,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				if (slotsForHost.isEmpty()) {
 					availableSlotsByHost.remove(host);
 				}
+
+				return slotsForTm;
+			} else {
+				return Collections.emptySet();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 7a627b4..0df6262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -84,10 +84,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	/**
 	 * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
 	 *
-	 * @param resourceID identifying the TaskExecutor which shall be released from the SlotPool
+	 * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
 	 * @return Future acknowledge which is completed after the TaskExecutor has been released
 	 */
-	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
+	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
 
 	/**
 	 * Offers a slot to the {@link SlotPool}. The slot offer can be accepted or

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 20dcfa9..77737e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -356,8 +356,7 @@ public class JobLeaderService {
 	 * Retrying registration for the job manager <--> task manager connection.
 	 */
 	private static final class JobManagerRetryingRegistration
-			extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess>
-	{
+			extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
 
 		private final String taskManagerRpcAddress;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a348948..5577472 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -113,34 +113,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	public static final String TASK_MANAGER_NAME = "taskmanager";
 
-	/** The connection information of this task manager */
+	/** The connection information of this task manager. */
 	private final TaskManagerLocation taskManagerLocation;
 
-	/** Max blob port which is accepted */
+	/** Max blob port which is accepted. */
 	public static final int MAX_BLOB_PORT = 65536;
 
-	/** The access to the leader election and retrieval services */
+	/** The access to the leader election and retrieval services. */
 	private final HighAvailabilityServices haServices;
 
-	/** The task manager configuration */
+	/** The task manager configuration. */
 	private final TaskManagerConfiguration taskManagerConfiguration;
 
-	/** The I/O manager component in the task manager */
+	/** The I/O manager component in the task manager. */
 	private final IOManager ioManager;
 
-	/** The memory manager component in the task manager */
+	/** The memory manager component in the task manager. */
 	private final MemoryManager memoryManager;
 
-	/** The network component in the task manager */
+	/** The network component in the task manager. */
 	private final NetworkEnvironment networkEnvironment;
 
-	/** The heartbeat manager for job manager in the task manager */
+	/** The heartbeat manager for job manager in the task manager. */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
-	/** The heartbeat manager for resource manager in the task manager */
+	/** The heartbeat manager for resource manager in the task manager. */
 	private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
 
-	/** The fatal error handler to use in case of a fatal error */
+	/** The fatal error handler to use in case of a fatal error. */
 	private final FatalErrorHandler fatalErrorHandler;
 
 	private final TaskManagerMetricGroup taskManagerMetricGroup;
@@ -673,6 +673,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		freeSlotInternal(allocationId, cause);
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
 	// ----------------------------------------------------------------------
 	// Disconnection RPCs
 	// ----------------------------------------------------------------------
@@ -820,7 +827,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 								// We encountered an exception. Free the slots and return them to the RM.
 								for (SlotOffer reservedSlot: reservedSlots) {
-									freeSlot(reservedSlot.getAllocationId(), throwable);
+									freeSlotInternal(reservedSlot.getAllocationId(), throwable);
 								}
 							}
 						} else {
@@ -834,7 +841,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 								final Exception e = new Exception("The slot was rejected by the JobManager.");
 
 								for (SlotOffer rejectedSlot : reservedSlots) {
-									freeSlot(rejectedSlot.getAllocationId(), e);
+									freeSlotInternal(rejectedSlot.getAllocationId(), e);
 								}
 							} else {
 								// discard the response since there is a new leader for the job
@@ -898,20 +905,24 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		// 1. fail tasks running under this JobID
 		Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
 
+		final FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId +
+			" lost the leadership.", cause);
+
 		while (tasks.hasNext()) {
-			tasks.next().failExternally(new Exception("JobManager responsible for " + jobId +
-				" lost the leadership."));
+			tasks.next().failExternally(failureCause);
 		}
 
 		// 2. Move the active slots to state allocated (possible to time out again)
 		Iterator<AllocationID> activeSlots = taskSlotTable.getActiveSlots(jobId);
 
+		final FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
+
 		while (activeSlots.hasNext()) {
 			AllocationID activeSlot = activeSlots.next();
 
 			try {
 				if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) {
-					freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
+					freeSlotInternal(activeSlot, freeingCause);
 				}
 			} catch (SlotNotFoundException e) {
 				log.debug("Could not mark the slot {} inactive.", jobId, e);
@@ -1017,8 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void updateTaskExecutionState(
 			final JobMasterGateway jobMasterGateway,
-			final TaskExecutionState taskExecutionState)
-	{
+			final TaskExecutionState taskExecutionState) {
 		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
 
 		CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
@@ -1065,7 +1075,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private void freeSlot(AllocationID allocationId, Throwable cause) {
+	private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
 		Preconditions.checkNotNull(allocationId);
 
 		try {
@@ -1085,8 +1095,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private void freeSlot(AllocationID allocationId) {
-		freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed."));
+	private void freeSlotInternal(AllocationID allocationId) {
+		freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed."));
 	}
 
 	private void timeoutSlot(AllocationID allocationId, UUID ticket) {
@@ -1094,7 +1104,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		Preconditions.checkNotNull(ticket);
 
 		if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
-			freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out."));
+			freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));
 		} else {
 			log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
 		}
@@ -1196,14 +1206,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
 			log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
 
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					closeJobManagerConnection(
-						jobId,
-						new Exception("Job leader for job id " + jobId + " lost leadership."));
-				}
-			});
+			runAsync(() ->
+				closeJobManagerConnection(
+					jobId,
+					new Exception("Job leader for job id " + jobId + " lost leadership.")));
 		}
 
 		@Override
@@ -1219,12 +1225,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			final ResourceID resourceManagerId = success.getResourceManagerId();
 
 			runAsync(
-				new Runnable() {
-					@Override
-					public void run() {
-						establishResourceManagerConnection(resourceManagerId);
-					}
-				}
+				() -> establishResourceManagerConnection(resourceManagerId)
 			);
 		}
 
@@ -1243,12 +1244,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
-				}
-			});
+			runAsync(() -> unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID));
 		}
 
 		@Override
@@ -1263,12 +1259,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.failTask(executionAttemptID, cause);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
 		}
 
 		@Override
@@ -1281,22 +1272,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void freeSlot(final AllocationID allocationId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.freeSlot(allocationId);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId));
 		}
 
 		@Override
 		public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.timeoutSlot(allocationId, ticket);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
 		}
 	}
 
@@ -1304,19 +1285,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
-
-					if (jobManagerConnections.containsKey(resourceID)) {
-						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
-
-						if (jobManagerConnection != null) {
-							closeJobManagerConnection(
-								jobManagerConnection.getJobID(),
-								new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
-						}
+			runAsync(() -> {
+				log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
+
+				if (jobManagerConnections.containsKey(resourceID)) {
+					JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+
+					if (jobManagerConnection != null) {
+						closeJobManagerConnection(
+							jobManagerConnection.getJobID(),
+							new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
 					}
 				}
 			});
@@ -1337,15 +1315,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
-
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-				}
+			runAsync(() -> {
+				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+
+				closeResourceManagerConnection(
+					new TimeoutException(
+						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ee0f69d..3dc80b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -162,4 +162,17 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param cause for the disconnection from the ResourceManager
 	 */
 	void disconnectResourceManager(Exception cause);
+
+	/**
+	 * Frees the slot with the given allocation ID.
+	 *
+	 * @param allocationId identifying the slot to free
+	 * @param cause of the freeing operation
+	 * @param timeout for the operation
+	 * @return Future acknowledge which is returned once the slot has been freed
+	 */
+	CompletableFuture<Acknowledge> freeSlot(
+		final AllocationID allocationId,
+		final Throwable cause,
+		@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 6f5230c..411aa94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -38,34 +38,34 @@ import java.util.Map;
  *     <li>Allocated - The slot has been allocated for a job.</li>
  *     <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li>
  * </ul>
- * <p>
- * A task slot can only be allocated if it is in state free. An allocated task slot can transition
+ *
+ * <p>A task slot can only be allocated if it is in state free. An allocated task slot can transition
  * to state active.
- *<p>
- * An active slot allows to add tasks from the respective job and with the correct allocation id.
+ *
+ * <p>An active slot allows to add tasks from the respective job and with the correct allocation id.
  * An active slot can be marked as inactive which sets the state back to allocated.
- * <p>
- * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
+ *
+ * <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
  * can be set to releasing indicating that it can be freed once it becomes empty.
  */
 public class TaskSlot {
 
-	/** Index of the task slot */
+	/** Index of the task slot. */
 	private final int index;
 
-	/** Resource characteristics for this slot */
+	/** Resource characteristics for this slot. */
 	private final ResourceProfile resourceProfile;
 
-	/** Tasks running in this slot */
+	/** Tasks running in this slot. */
 	private final Map<ExecutionAttemptID, Task> tasks;
 
-	/** State of this slot */
+	/** State of this slot. */
 	private TaskSlotState state;
 
-	/** Job id to which the slot has been allocated; null if not allocated */
+	/** Job id to which the slot has been allocated; null if not allocated. */
 	private JobID jobId;
 
-	/** Allocation id of this slot; null if not allocated */
+	/** Allocation id of this slot; null if not allocated. */
 	private AllocationID allocationId;
 
 	TaskSlot(final int index, final ResourceProfile resourceProfile) {
@@ -151,7 +151,7 @@ public class TaskSlot {
 	 * task with the same execution attempt id added to the task slot. In this case, the method
 	 * returns true. Otherwise the task slot is left unchanged and false is returned.
 	 *
-	 * In case that the task slot state is not active an {@link IllegalStateException} is thrown.
+	 * <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.
 	 * In case that the task's job id and allocation id don't match with the job id and allocation
 	 * id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
 	 *
@@ -199,7 +199,7 @@ public class TaskSlot {
 	 * or is already allocated/active for the given job and allocation id, then the method returns
 	 * true. Otherwise it returns false.
 	 *
-	 * A slot can only be allocated if it's current state is free.
+	 * <p>A slot can only be allocated if it's current state is free.
 	 *
 	 * @param newJobId to allocate the slot for
 	 * @param newAllocationId to identify the slot allocation
@@ -230,7 +230,7 @@ public class TaskSlot {
 	/**
 	 * Mark this slot as active. A slot can only be marked active if it's in state allocated.
 	 *
-	 * The method returns true if the slot was set to active. Otherwise it returns false.
+	 * <p>The method returns true if the slot was set to active. Otherwise it returns false.
 	 *
 	 * @return True if the new state of the slot is active; otherwise false
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 1384336..62101e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,35 +48,35 @@ import java.util.UUID;
 /**
  * Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices
  * for faster access to tasks and sets of allocated slots.
- * <p>
- * The task slot table automatically registers timeouts for allocated slots which cannot be assigned
+ *
+ * <p>The task slot table automatically registers timeouts for allocated slots which cannot be assigned
  * to a job manager.
- * <p>
- * Before the task slot table can be used, it must be started via the {@link #start} method.
+ *
+ * <p>Before the task slot table can be used, it must be started via the {@link #start} method.
  */
 public class TaskSlotTable implements TimeoutListener<AllocationID> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
 
-	/** Timer service used to time out allocated slots */
+	/** Timer service used to time out allocated slots. */
 	private final TimerService<AllocationID> timerService;
 
-	/** The list of all task slots */
+	/** The list of all task slots. */
 	private final List<TaskSlot> taskSlots;
 
-	/** Mapping from allocation id to task slot */
+	/** Mapping from allocation id to task slot. */
 	private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
 
-	/** Mapping from execution attempt id to task and task slot */
+	/** Mapping from execution attempt id to task and task slot. */
 	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
 
-	/** Mapping from job id to allocated slots for a job */
+	/** Mapping from job id to allocated slots for a job. */
 	private final Map<JobID, Set<AllocationID>> slotsPerJob;
 
-	/** Interface for slot actions, such as freeing them or timing them out */
+	/** Interface for slot actions, such as freeing them or timing them out. */
 	private SlotActions slotActions;
 
-	/** Whether the table has been started */
+	/** Whether the table has been started. */
 	private boolean started;
 
 	public TaskSlotTable(
@@ -555,7 +556,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		private final Task task;
 		private final TaskSlot taskSlot;
 
-
 		private TaskSlotMapping(Task task, TaskSlot taskSlot) {
 			this.task = Preconditions.checkNotNull(task);
 			this.taskSlot = Preconditions.checkNotNull(taskSlot);
@@ -675,7 +675,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	}
 
 	/**
-	 * Iterator over all {@link Task} for a given job
+	 * Iterator over all {@link Task} for a given job.
 	 */
 	private final class TaskIterator implements Iterator<Task> {
 		private final Iterator<TaskSlot> taskSlotIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 8ec9a2e..84fb9e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,16 +40,16 @@ public class TimerService<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TimerService.class);
 
-	/** Executor service for the scheduled timeouts */
+	/** Executor service for the scheduled timeouts. */
 	private final ScheduledExecutorService scheduledExecutorService;
 
 	/** Timeout for the shutdown of the service. */
 	private final long shutdownTimeout;
 
-	/** Map of currently active timeouts */
+	/** Map of currently active timeouts. */
 	private final Map<K, Timeout<K>> timeouts;
 
-	/** Listener which is notified about occurring timeouts */
+	/** Listener which is notified about occurring timeouts. */
 	private TimeoutListener<K> timeoutListener;
 
 	public TimerService(
@@ -79,7 +80,7 @@ public class TimerService<K> {
 		scheduledExecutorService.shutdown();
 
 		try {
-			if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+			if (!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
 				LOG.debug("The scheduled executor service did not properly terminate. Shutting " +
 					"it down now.");
 				scheduledExecutorService.shutdownNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 2245a8c..ccaed96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -599,8 +599,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
 		WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasks);
-		taskManagerGateway.setCancelCondition(waitForTasksCancelled);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
+		taskManagerGateway.setCancelConsumer(waitForTasksCancelled);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -649,7 +649,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
 
 		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
 
 		waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);
 
@@ -685,7 +685,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasks);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -699,7 +699,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILING, eg.getState());
 
 		WaitForTasks waitForTasksRestart = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasksRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksRestart);
 
 		completeCancellingForAllVertices(eg);
 		waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
@@ -750,7 +750,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism * 2);
-		taskManagerGateway.setCondition(waitForTasks);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -766,7 +766,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILING, eg.getState());
 
 		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism * 2);
-		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
 
 		completeCancellingForAllVertices(eg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 682705a..628f004 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,21 +48,27 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	private final String address = UUID.randomUUID().toString();
 
-	private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
+	private Optional<Consumer<ExecutionAttemptID>> optSubmitConsumer;
 
-	private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;
+	private Optional<Consumer<ExecutionAttemptID>> optCancelConsumer;
+
+	private volatile Consumer<Tuple2<AllocationID, Throwable>> freeSlotConsumer;
 
 	public SimpleAckingTaskManagerGateway() {
-		optSubmitCondition = Optional.empty();
-		optCancelCondition = Optional.empty();
+		optSubmitConsumer = Optional.empty();
+		optCancelConsumer = Optional.empty();
+	}
+
+	public void setSubmitConsumer(Consumer<ExecutionAttemptID> predicate) {
+		optSubmitConsumer = Optional.of(predicate);
 	}
 
-	public void setCondition(Consumer<ExecutionAttemptID> predicate) {
-		optSubmitCondition = Optional.of(predicate);
+	public void setCancelConsumer(Consumer<ExecutionAttemptID> predicate) {
+		optCancelConsumer = Optional.of(predicate);
 	}
 
-	public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) {
-		optCancelCondition = Optional.of(predicate);
+	public void setFreeSlotConsumer(Consumer<Tuple2<AllocationID, Throwable>> consumer) {
+		freeSlotConsumer = consumer;
 	}
 
 	@Override
@@ -92,7 +100,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-		optSubmitCondition.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
+		optSubmitConsumer.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
@@ -103,7 +111,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID));
+		optCancelConsumer.ifPresent(condition -> condition.accept(executionAttemptID));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
@@ -139,4 +147,15 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 	public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		final Consumer<Tuple2<AllocationID, Throwable>> currentFreeSlotConsumer = freeSlotConsumer;
+
+		if (currentFreeSlotConsumer != null) {
+			currentFreeSlotConsumer.accept(Tuple2.of(allocationId, cause));
+		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
new file mode 100644
index 0000000..939aece
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Dummy implementation of {@link ScheduledUnit} for testing purposes.
+ */
+public class DummyScheduledUnit extends ScheduledUnit {
+	public DummyScheduledUnit() {
+		super(
+			null,
+			new JobVertexID(),
+			null,
+			null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 707ea00..6a8ef0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -44,33 +44,31 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_MOCKS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class SlotPoolTest extends TestLogger {
 
@@ -84,7 +82,7 @@ public class SlotPoolTest extends TestLogger {
 
 	private TaskManagerLocation taskManagerLocation;
 
-	private TaskManagerGateway taskManagerGateway;
+	private SimpleAckingTaskManagerGateway taskManagerGateway;
 
 	@Before
 	public void setUp() throws Exception {
@@ -102,7 +100,10 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testAllocateSimpleSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -112,17 +113,14 @@ public class SlotPoolTest extends TestLogger {
 			SlotRequestId requestId = new SlotRequestId();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -136,13 +134,21 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(slot.isAlive());
 			assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
 		} finally {
-			slotPool.shutDown();
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
 		}
 	}
 
 	@Test
 	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final ArrayBlockingQueue<SlotRequest> slotRequestQueue = new ArrayBlockingQueue<>(2);
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
+			while (!slotRequestQueue.offer(slotRequest)) {
+				// noop
+			}
+		});
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -151,14 +157,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -167,11 +173,11 @@ public class SlotPoolTest extends TestLogger {
 			assertFalse(future1.isDone());
 			assertFalse(future2.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
-				.requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			final List<SlotRequest> slotRequests = new ArrayList<>(2);
 
-			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
+			for (int i = 0; i < 2; i++) {
+				slotRequests.add(slotRequestQueue.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
+			}
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequests.get(0).getAllocationId(),
@@ -198,13 +204,16 @@ public class SlotPoolTest extends TestLogger {
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
 			assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
 		} finally {
-			slotPool.shutDown();
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
 		}
 	}
 
 	@Test
 	public void testAllocateWithFreeSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -213,17 +222,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future1.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -240,7 +246,7 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -262,7 +268,11 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testOfferSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -271,17 +281,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -320,7 +327,10 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testReleaseResource() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
 		final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
 
@@ -347,20 +357,17 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -527,13 +534,58 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
-	private static ResourceManagerGateway createResourceManagerGatewayMock() {
-		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway
-			.requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
+	/**
+	 * Tests that a SlotPool shutdown releases all registered slots
+	 */
+	@Test
+	public void testShutdownReleasesAllSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+
+		try {
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+			final int numSlotOffers = 2;
+
+			final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
+
+			for (int i = 0; i < numSlotOffers; i++) {
+				slotOffers.add(
+					new SlotOffer(
+						new AllocationID(),
+						i,
+						ResourceProfile.UNKNOWN));
+			}
+
+			final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
 
-		return resourceManagerGateway;
+			taskManagerGateway.setFreeSlotConsumer(tuple -> {
+				while(!freedSlotQueue.offer(tuple.f0)) {}
+			});
+
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotOffersFuture = slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+			final Collection<SlotOffer> acceptedSlotOffers = acceptedSlotOffersFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
+
+			// shut down the slot pool
+			slotPool.shutDown();
+			slotPool.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			// the shut down operation should have freed all registered slots
+			ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
+
+			while (freedSlots.size() < numSlotOffers) {
+				freedSlotQueue.drainTo(freedSlots);
+			}
+
+			assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
 	}
 
 	private static SlotPoolGateway setupSlotPool(

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 94f325d..c6334c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -113,6 +113,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
 	public String getAddress() {
 		return address;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-streaming-scala/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/resources/log4j-test.properties b/flink-streaming-scala/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..7ba1633
--- /dev/null
+++ b/flink-streaming-scala/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console