You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:35 UTC

[2/7] flink git commit: [FLINK-1580] [taskmanager] Improve TaskManager startup robustness

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 19d55f7..05ede99 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -20,13 +20,12 @@ package org.apache.flink.runtime.taskmanager
 
 import org.apache.flink.configuration.Configuration
 
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
 
-case class TaskManagerConfiguration(numberOfSlots: Int,
-                                    memorySize: Long, pageSize: Int,
-                                    tmpDirPaths: Array[String],
+
+case class TaskManagerConfiguration(tmpDirPaths: Array[String],
                                     cleanupInterval: Long,
-                                    profilingInterval: Option[Long],
                                     timeout: FiniteDuration,
-                                    maxRegistrationDuration: Duration,
+                                    maxRegistrationDuration: Option[FiniteDuration],
+                                    numberOfSlots: Int,
                                     configuration: Configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5dd10c0..7380b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -43,8 +43,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -113,16 +115,16 @@ public class ExecutionGraphTestUtils {
 		public TaskDeploymentDescriptor lastTDD;
 		@Override
 		public void onReceive(Object msg) throws Exception {
-			if (msg instanceof TaskManagerMessages.SubmitTask) {
-				TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
+			if (msg instanceof SubmitTask) {
+				SubmitTask submitTask = (SubmitTask) msg;
 				lastTDD = submitTask.tasks();
 
 				getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(), true), getSelf());
-			} else if (msg instanceof TaskManagerMessages.CancelTask) {
-				TaskManagerMessages.CancelTask cancelTask = (TaskManagerMessages.CancelTask) msg;
+			} else if (msg instanceof CancelTask) {
+				CancelTask cancelTask = (CancelTask) msg;
 				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
 			}
-			else if (msg instanceof TaskManagerMessages.FailIntermediateResultPartitions) {
+			else if (msg instanceof FailIntermediateResultPartitions) {
 				getSender().tell(new Object(), getSelf());
 			}
 		}
@@ -133,13 +135,13 @@ public class ExecutionGraphTestUtils {
 	public static class SimpleFailingTaskManager extends UntypedActor {
 		@Override
 		public void onReceive(Object msg) throws Exception {
-			if (msg instanceof TaskManagerMessages.SubmitTask) {
-				TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
+			if (msg instanceof SubmitTask) {
+				SubmitTask submitTask = (SubmitTask) msg;
 
 				getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(),
 						false, ERROR_MESSAGE),	getSelf());
-			} else if (msg instanceof TaskManagerMessages.CancelTask) {
-				TaskManagerMessages.CancelTask cancelTask = (TaskManagerMessages.CancelTask) msg;
+			} else if (msg instanceof CancelTask) {
+				CancelTask cancelTask = (CancelTask) msg;
 				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index e199353..226b256 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -644,10 +644,10 @@ public class ExecutionVertexCancelTest {
 
 		@Override
 		public void onReceive(Object message) throws Exception {
-			if(message instanceof TaskManagerMessages.SubmitTask){
-				TaskDeploymentDescriptor tdd = ((TaskManagerMessages.SubmitTask) message).tasks();
+			if(message instanceof TaskMessages.SubmitTask){
+				TaskDeploymentDescriptor tdd = ((TaskMessages.SubmitTask) message).tasks();
 				getSender().tell(new TaskOperationResult(tdd.getExecutionId(), true), getSelf());
-			}else if(message instanceof TaskManagerMessages.CancelTask){
+			}else if(message instanceof TaskMessages.CancelTask){
 				index++;
 				if(index >= results.length){
 					getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 5713c10..7b72669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
new file mode 100644
index 0000000..1b4f5f3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RegistrationMessages.AcknowledgeRegistration;
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager;
+import org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * The tests in this class verify the behavior of the TaskManager
+ * when connecting to the JobManager, and when the JobManager
+ * is unreachable.
+ */
+public class RegistrationTest {
+
+	private static final Option<String> NONE_STRING = Option.empty();
+
+	// use one actor system throughout all tests
+	private static ActorSystem actorSystem;
+
+	@BeforeClass
+	public static void startActorSystem() {
+		Configuration config = new Configuration();
+		config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+		config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+		config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+		config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+
+		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void shutdownActorSystem() {
+		if (actorSystem != null) {
+			actorSystem.shutdown();
+		}
+	}
+
+	/**
+	 * A test that verifies that two TaskManagers correctly register at the
+	 * JobManager.
+	 */
+	@Test
+	public void testSimpleRegistration() {
+		new JavaTestKit(actorSystem) {{
+			try {
+				// a simple JobManager
+				ActorRef jobManager = startJobManager();
+
+				// start two TaskManagers. it will automatically try to register
+				final ActorRef taskManager1 = startTaskManager(jobManager);
+				final ActorRef taskManager2 = startTaskManager(jobManager);
+
+				// check that the TaskManagers are registered
+				Future<Object> responseFuture1 = Patterns.ask(
+						taskManager1,
+						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+						5000);
+
+				Future<Object> responseFuture2 = Patterns.ask(
+						taskManager2,
+						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+						5000);
+
+				Object response1 = Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
+				Object response2 = Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
+
+				// this is a hack to work around the way Java can interact with scala case objects
+				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+				assertTrue(response1 != null && confirmClass.isAssignableFrom(response1.getClass()));
+				assertTrue(response2 != null && confirmClass.isAssignableFrom(response2.getClass()));
+
+				// check that the JobManager has 2 TaskManagers registered
+				Future<Object> numTaskManagersFuture = Patterns.ask(
+						jobManager,
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+						1000);
+
+				Integer count = (Integer) Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+				assertEquals(2, count.intValue());
+
+				stopActor(taskManager1);
+				stopActor(taskManager2);
+				stopActor(jobManager);
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}};
+	}
+
+	/**
+	 * A test that verifies that two TaskManagers correctly register at the
+	 * JobManager.
+	 */
+	@Test
+	public void testDelayedRegistration() {
+		new JavaTestKit(actorSystem) {{
+			try {
+				// start a TaskManager that tries to register at the JobManager before the JobManager is
+				// available. we give it the regular JobManager akka URL
+				final ActorRef taskManager = startTaskManager(JobManager.getLocalJobManagerAkkaURL(),
+																new Configuration());
+				// let it try for a bit
+				Thread.sleep(6000);
+
+				// now start the JobManager, with the regular akka URL
+				final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+
+				// check that the TaskManagers are registered
+				Future<Object> responseFuture = Patterns.ask(
+						taskManager,
+						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+						30000);
+
+				Object response = Await.result(responseFuture, new FiniteDuration(30, TimeUnit.SECONDS));
+
+				// this is a hack to work around the way Java can interact with scala case objects
+				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+				assertTrue(response != null && confirmClass.isAssignableFrom(response.getClass()));
+
+				stopActor(taskManager);
+				stopActor(jobManager);
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}};
+	}
+
+	/**
+	 * Tests that the TaskManager shuts down when it cannot register at the
+	 * JobManager within the given maximum duration.
+	 *
+	 * Unfortunately, this test does not give good error messages.
+	 * (I have not figured out how to get any better message out of the
+	 * Akka TestKit than "ask timeout exception".)
+	 *
+	 * Anyways: An "ask timeout exception" here means that the TaskManager
+	 * did not shut down after its registration timeout expired.
+	 */
+	@Test
+	public void testShutdownAfterRegistrationDurationExpired() {
+		new JavaTestKit(actorSystem) {{
+			try {
+				// registration timeout of 1 second
+				Configuration tmConfig = new Configuration();
+				tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms");
+
+				// start the taskManager actor
+				final ActorRef taskManager = startTaskManager(JobManager.getLocalJobManagerAkkaURL(), tmConfig);
+
+				// make sure it terminates in time, since it cannot register at a JobManager
+				watch(taskManager);
+				new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+
+					@Override
+					protected void run() {
+						expectTerminated(taskManager);
+					}
+				};
+
+				stopActor(taskManager);
+			}
+			catch (Throwable e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}};
+	}
+
+	/**
+	 * Make sure that the TaskManager keeps trying to register, even after
+	 * registration attempts have been refused.
+	 */
+	@Test
+	public void testTaskManagerResumesConnectAfterRefusedRegistration() {
+		new JavaTestKit(actorSystem) {{
+			try {
+				// we make the test actor (the test kit) the JobManager to intercept
+				// the messages
+				final ActorRef taskManager = startTaskManager(getTestActor());
+
+				// check and decline initial registration
+				new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+					@Override
+					protected void run() {
+						// the TaskManager should try to register
+						expectMsgClass(RegisterTaskManager.class);
+
+						// we decline the registration
+						getLastSender().tell(new RefuseRegistration("test reason"), getTestActor());
+					}
+				};
+
+				// the TaskManager should wait a bit an retry...
+				FiniteDuration maxDelay = (FiniteDuration) TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
+				new Within(maxDelay) {
+
+					@Override
+					protected void run() {
+						expectMsgClass(RegisterTaskManager.class);
+					}
+				};
+
+				stopActor(taskManager);
+			}
+			catch (Throwable e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}};
+	}
+
+	/**
+	 * Validate that the TaskManager attempts to re-connect after it lost the connection
+	 * to the JobManager.
+	 */
+	@Test
+	public void testTaskManagerResumesConnectAfterJobManagerFailure() {
+		new JavaTestKit(actorSystem) {{
+			try {
+				final Props fakeJmProps = Props.create(ForwardingActor.class, getTestActor());
+				final String jobManagerName = "FAKE_JOB_MANAGER";
+
+				final ActorRef fakeJobManager1 = actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+
+				// we make the test actor (the test kit) the JobManager to intercept
+				// the messages
+				final ActorRef taskManager = startTaskManager(fakeJobManager1);
+
+				// validate initial registration
+				new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+					@Override
+					protected void run() {
+						// the TaskManager should try to register
+						expectMsgClass(RegisterTaskManager.class);
+
+						// we accept the registration
+						taskManager.tell(new AcknowledgeRegistration(fakeJobManager1, new InstanceID(), 45234),
+										fakeJobManager1);
+					}
+				};
+
+				// kill the first forwarding JobManager
+				watch(fakeJobManager1);
+				stopActor(fakeJobManager1);
+
+				// wait for the killing to be completed
+				new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+					@Override
+					protected void run() {
+						expectTerminated(fakeJobManager1);
+					}
+				};
+
+				// now start the second fake JobManager and expect that
+				// the TaskManager registers again
+				// the second fake JM needs to have the same actor URL
+				final ActorRef fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+				// expect the next registration
+				new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+
+					@Override
+					protected void run() {
+						expectMsgClass(RegisterTaskManager.class);
+
+						// we accept the registration
+						taskManager.tell(new AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234),
+										fakeJobManager2);
+					}
+				};
+
+				stopActor(taskManager);
+				stopActor(fakeJobManager2);
+			}
+			catch (Throwable e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}};
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utility Functions
+	// --------------------------------------------------------------------------------------------
+
+	private static ActorRef startJobManager() throws Exception {
+		// start the actors. don't give names, so they get generated names and we
+		// avoid conflicts with the actor names
+		return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+	}
+
+	private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
+		return startTaskManager(jobManager.path().toString(), new Configuration());
+	}
+
+	private static ActorRef startTaskManager(String jobManagerUrl, Configuration config) throws Exception {
+		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+
+		return TaskManager.startTaskManagerComponentsAndActor(
+				config, actorSystem, "localhost",
+				NONE_STRING, // no actor name -> random
+				new Some<String>(jobManagerUrl), // job manager path
+				true, // local network stack only
+				TaskManager.class);
+	}
+
+	private static void stopActor(ActorRef actor) {
+		actor.tell(Kill.getInstance(), ActorRef.noSender());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utility Actor that only forwards messages
+	// --------------------------------------------------------------------------------------------
+
+	public static class ForwardingActor extends UntypedActor {
+
+		private final ActorRef target;
+
+		public ForwardingActor(ActorRef target) {
+			this.target = target;
+		}
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			target.forward(message, context());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
new file mode 100644
index 0000000..131549b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+public class TaskManagerComponentsStartupShutdownTest {
+
+	/**
+	 * Makes sure that all components are shut down when the TaskManager
+	 * actor is shut down.
+	 */
+	@Test
+	public void testComponentsStartupShutdown() {
+
+		final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
+		final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+		final int BUFFER_SIZE = 32 * 1024;
+
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 s");
+		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
+
+		ActorSystem actorSystem = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(config);
+
+			final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+
+			// create the components for the TaskManager manually
+			final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
+					TMP_DIR,
+					1000000,
+					timeout,
+					Option.<FiniteDuration>empty(),
+					1,
+					config);
+
+			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
+					32, BUFFER_SIZE, IOManager.IOMode.SYNC, Option.<NettyConfig>empty());
+
+			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
+
+			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE);
+			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
+			final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
+			final int numberOfSlots = 1;
+
+			// create the task manager
+			final Props tmProps = Props.create(TaskManager.class,
+					tmConfig, connectionInfo, jobManager.path().toString(),
+					memManager, ioManager, network, numberOfSlots);
+
+			final ActorRef taskManager = actorSystem.actorOf(tmProps);
+
+			new JavaTestKit(actorSystem) {{
+
+				// wait for the TaskManager to be registered
+				new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+					@Override
+					protected void run() {
+						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+								getTestActor());
+
+						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+					}
+				};
+			}};
+
+			// the components should now all be initialized
+			assertTrue(network.isAssociated());
+
+			// shut down all actors and the actor system
+			// Kill the Task down the JobManager
+			taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+			jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+
+			// shut down the actors and the actor system
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
+			actorSystem = null;
+
+			// now that the TaskManager is shut down, the components should be shut down as well
+			assertFalse(network.isAssociated());
+			assertTrue(network.isShutdown());
+			assertTrue(ioManager.isProperlyShutDown());
+			assertTrue(memManager.isShutdown());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
new file mode 100644
index 0000000..d243fbf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.junit.Test;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Validates that the TaskManager startup properly obeys the configuration
+ * values.
+ */
+public class TaskManagerConfigurationTest {
+
+	@Test
+	public void testUsePreconfiguredNetworkInterface() {
+		try {
+			final String TEST_HOST_NAME = "testhostname";
+
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
+
+			Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+
+			// validate the configured test host name
+			assertEquals(TEST_HOST_NAME, address._1());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testActorSystemPortConfig() {
+		try {
+			// config with pre-configured hostname to speed up tests (no interface selection)
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+
+			// auto port
+			assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891)._2());
+
+			// pre-defined port
+			final int testPort = 22551;
+			config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
+			assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891)._2());
+
+			// invalid port
+			try {
+				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
+				TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+				fail("should fail with an exception");
+			}
+			catch (IllegalConfigurationException e) {
+				// bam!
+			}
+
+			// invalid port
+			try {
+				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
+				TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+				fail("should fail with an exception");
+			}
+			catch (IllegalConfigurationException e) {
+				// bam!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testNetworkInterfaceSelection() {
+		ServerSocket server;
+		String hostname = "localhost";
+
+		try {
+			InetAddress localhostAddress = InetAddress.getByName(hostname);
+			server = new ServerSocket(0, 50, localhostAddress);
+		}
+		catch (UnknownHostException e) {
+			// may happen if disconnected. skip test.
+			System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
+			return;
+		}
+		catch (IOException e) {
+			// may happen in certain test setups, skip test.
+			System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
+			return;
+		}
+
+		try {
+			// open a server port to allow the system to connect
+			Configuration config = new Configuration();
+
+			assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, hostname, server.getLocalPort())._1());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			try {
+				server.close();
+			} catch (IOException e) {
+				// ignore shutdown errors
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index f4ee52f..3e65916 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -139,7 +139,7 @@ public class TaskManagerProcessReapingTest {
 			// wait for max 5 seconds for the process to terminate
 			{
 				long now = System.currentTimeMillis();
-				long deadline = now + 5000;
+				long deadline = now + 10000;
 
 				while (now < deadline && isProcessAlive(taskManagerProcess)) {
 					Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 640ccc3..e736a55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
@@ -48,14 +49,17 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -93,10 +97,12 @@ public class TaskManagerTest {
 	@Test
 	public void testSetupTaskManager() {
 		new JavaTestKit(system){{
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
 			try {
-				ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class));
+				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
 
-				final ActorRef tm = createTaskManager(jobManager);
+				taskManager = createTaskManager(jobManager);
 
 				JobID jid = new JobID();
 				JobVertexID vid = new JobVertexID();
@@ -108,11 +114,12 @@ public class TaskManagerTest {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
-				new Within(duration("1 seconds")){
+				final ActorRef tmClosure = taskManager;
+				new Within(duration("2 seconds")) {
 
 					@Override
 					protected void run() {
-						tm.tell(new SubmitTask(tdd), getRef());
+						tmClosure.tell(new SubmitTask(tdd), getRef());
 						expectMsgEquals(new TaskOperationResult(eid, true));
 					}
 				};
@@ -121,16 +128,27 @@ public class TaskManagerTest {
 				e.printStackTrace();
 				fail(e.getMessage());
 			}
+			finally {
+				// shut down the actors
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
 		}};
 	}
 	
 	@Test
 	public void testJobSubmissionAndCanceling() {
 		new JavaTestKit(system){{
-			try {
 
-				ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class));
-				final ActorRef tm = createTaskManager(jobManager);
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
+			try {
+				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
+				taskManager = createTaskManager(jobManager);
 
 				final JobID jid1 = new JobID();
 				final JobID jid2 = new JobID();
@@ -153,6 +171,7 @@ public class TaskManagerTest {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
+				final ActorRef tm = taskManager;
 				final FiniteDuration d = duration("1 second");
 
 				new Within(d) {
@@ -220,21 +239,34 @@ public class TaskManagerTest {
 						}
 					}
 				};
-			}catch(Exception e){
+			}
+			catch(Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
 			}
-
+			finally {
+				// shut down the actors
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
 		}};
 	}
 	
 	@Test
 	public void testGateChannelEdgeMismatch() {
 		new JavaTestKit(system){{
+
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
 			try {
-				ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class));
+				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
 
-				final ActorRef tm = createTaskManager(jm);
+				taskManager = createTaskManager(jobManager);
+				final ActorRef tm = taskManager;
 
 				final JobID jid = new JobID();
 
@@ -260,127 +292,152 @@ public class TaskManagerTest {
 
 					@Override
 					protected void run() {
-						try {
-							tm.tell(new SubmitTask(tdd1), getRef());
-							tm.tell(new SubmitTask(tdd2), getRef());
-							TaskOperationResult result = expectMsgClass(TaskOperationResult.class);
-							assertFalse(result.success());
-							assertEquals(eid1, result.executionID());
+						tm.tell(new SubmitTask(tdd1), getRef());
+						tm.tell(new SubmitTask(tdd2), getRef());
 
-							result = expectMsgClass(TaskOperationResult.class);
-							assertFalse(result.success());
-							assertEquals(eid2, result.executionID());
+						TaskOperationResult result = expectMsgClass(TaskOperationResult.class);
+						assertFalse(result.success());
+						assertEquals(eid1, result.executionID());
 
-							tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									getRef());
-							tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									getRef());
+						result = expectMsgClass(TaskOperationResult.class);
+						assertFalse(result.success());
+						assertEquals(eid2, result.executionID());
 
-							expectMsgEquals(true);
-							expectMsgEquals(true);
+						tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+								getRef());
+						tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+								getRef());
 
-							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
-									.ResponseRunningTasks.class).asJava();
+						expectMsgEquals(true);
+						expectMsgEquals(true);
 
-							assertEquals(0, tasks.size());
-						}catch (Exception e) {
-							e.printStackTrace();
-							fail(e.getMessage());
-						}
+						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
+								.ResponseRunningTasks.class).asJava();
+
+						assertEquals(0, tasks.size());
 					}
 				};
-			}catch (Exception e) {
+			}
+			catch (Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
 			}
+			finally {
+				// shut down the actors
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
 		}};
 	}
 	
 	@Test
 	public void testRunJobWithForwardChannel() {
 		new JavaTestKit(system){{
-			final JobID jid = new JobID();
 
-			JobVertexID vid1 = new JobVertexID();
-			JobVertexID vid2 = new JobVertexID();
-
-			final ExecutionAttemptID eid1 = new ExecutionAttemptID();
-			final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
+			try {
+				final JobID jid = new JobID();
 
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
-			final ActorRef tm = createTaskManager(jm);
+				JobVertexID vid1 = new JobVertexID();
+				JobVertexID vid2 = new JobVertexID();
 
-			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-			List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-			irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+				jobManager = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
 
-			InputGateDeploymentDescriptor ircdd =
-					new InputGateDeploymentDescriptor(
-							new IntermediateDataSetID(),
-							0, new InputChannelDeploymentDescriptor[]{
-									new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
-							}
-					);
+				taskManager = createTaskManager(jobManager);
+				final ActorRef tm = taskManager;
 
-			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
-					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-					irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
-			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
-					new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
-					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-					Collections.singletonList(ircdd),
-					new ArrayList<BlobKey>(), 0);
+				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
-			final FiniteDuration d = duration("1 second");
+				InputGateDeploymentDescriptor ircdd =
+						new InputGateDeploymentDescriptor(
+								new IntermediateDataSetID(),
+								0, new InputChannelDeploymentDescriptor[]{
+										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+								}
+						);
 
-			new Within(d) {
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
 
-				@Override
-				protected void run() {
-					try {
-						tm.tell(new SubmitTask(tdd1), getRef());
-						expectMsgEquals(new TaskOperationResult(eid1, true));
-						tm.tell(new SubmitTask(tdd2), getRef());
-						expectMsgEquals(new TaskOperationResult(eid2, true));
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.singletonList(ircdd),
+						new ArrayList<BlobKey>(), 0);
 
-						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
-								.class).asJava();
+				final FiniteDuration d = duration("1 second");
 
-						Task t1 = tasks.get(eid1);
-						Task t2 = tasks.get(eid2);
+				new Within(d) {
 
-						// wait until the tasks are done. rare thread races may cause the tasks to be done before
-						// we get to the check, so we need to guard the check
-						if (t1 != null) {
-							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									timeout);
-							Await.ready(response, d);
-						}
+					@Override
+					protected void run() {
+						try {
+							tm.tell(new SubmitTask(tdd1), getRef());
+							expectMsgEquals(new TaskOperationResult(eid1, true));
+							tm.tell(new SubmitTask(tdd2), getRef());
+							expectMsgEquals(new TaskOperationResult(eid2, true));
 
-						if (t2 != null) {
-							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									timeout);
-							Await.ready(response, d);
-							assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
-						}
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
+									.class).asJava();
+
+							Task t1 = tasks.get(eid1);
+							Task t2 = tasks.get(eid2);
+
+							// wait until the tasks are done. rare thread races may cause the tasks to be done before
+							// we get to the check, so we need to guard the check
+							if (t1 != null) {
+								Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+										timeout);
+								Await.ready(response, d);
+							}
 
-						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-						tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
-								.class).asJava();
+							if (t2 != null) {
+								Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+										timeout);
+								Await.ready(response, d);
+								assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
+							}
 
-						assertEquals(0, tasks.size());
-					}
-					catch (Exception e) {
-						e.printStackTrace();
-						fail(e.getMessage());
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+							tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
+									.class).asJava();
 
+							assertEquals(0, tasks.size());
+						}
+						catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
 					}
+				};
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+			finally {
+				// shut down the actors
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
 				}
-			};
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
 		}};
 	}
 	
@@ -390,109 +447,128 @@ public class TaskManagerTest {
 		// this tests creates two tasks. the sender sends data, and fails to send the
 		// state update back to the job manager
 		// the second one blocks to be canceled
-
 		new JavaTestKit(system){{
-			final JobID jid = new JobID();
 
-			JobVertexID vid1 = new JobVertexID();
-			JobVertexID vid2 = new JobVertexID();
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
+			try {
+				final JobID jid = new JobID();
 
-			final ExecutionAttemptID eid1 = new ExecutionAttemptID();
-			final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+				JobVertexID vid1 = new JobVertexID();
+				JobVertexID vid2 = new JobVertexID();
 
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
-			final ActorRef tm = createTaskManager(jm);
+				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+				jobManager = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
+				taskManager = createTaskManager(jobManager);
+				final ActorRef tm = taskManager;
 
-			List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-			irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
-			InputGateDeploymentDescriptor ircdd =
-					new InputGateDeploymentDescriptor(
-							new IntermediateDataSetID(),
-							0, new InputChannelDeploymentDescriptor[]{
-									new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
-							}
-					);
+				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
-			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
-					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-					irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+				InputGateDeploymentDescriptor ircdd =
+						new InputGateDeploymentDescriptor(
+								new IntermediateDataSetID(),
+								0, new InputChannelDeploymentDescriptor[]{
+										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+								}
+						);
 
-			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
-					new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
-					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-					Collections.singletonList(ircdd),
-					new ArrayList<BlobKey>(), 0);
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
+						new ArrayList<BlobKey>(), 0);
 
-			final FiniteDuration d = duration("1 second");
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.singletonList(ircdd),
+						new ArrayList<BlobKey>(), 0);
 
-			new Within(d){
+				final FiniteDuration d = duration("1 second");
 
-				@Override
-				protected void run() {
-					try {
-						// deploy sender before receiver, so the target is online when the sender requests the connection info
-						tm.tell(new SubmitTask(tdd2), getRef());
-						tm.tell(new SubmitTask(tdd1), getRef());
+				new Within(d){
 
-						expectMsgEquals(new TaskOperationResult(eid2, true));
-						expectMsgEquals(new TaskOperationResult(eid1, true));
+					@Override
+					protected void run() {
+						try {
+							// deploy sender before receiver, so the target is online when the sender requests the connection info
+							tm.tell(new SubmitTask(tdd2), getRef());
+							tm.tell(new SubmitTask(tdd1), getRef());
 
-						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
-								.ResponseRunningTasks.class).asJava();
+							expectMsgEquals(new TaskOperationResult(eid2, true));
+							expectMsgEquals(new TaskOperationResult(eid1, true));
 
-						Task t1 = tasks.get(eid1);
-						Task t2 = tasks.get(eid2);
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
+									.ResponseRunningTasks.class).asJava();
 
-						tm.tell(new CancelTask(eid2), getRef());
-						expectMsgEquals(new TaskOperationResult(eid2, true));
+							Task t1 = tasks.get(eid1);
+							Task t2 = tasks.get(eid2);
 
-						if (t2 != null) {
-							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-									timeout);
-							Await.ready(response, d);
-						}
+							tm.tell(new CancelTask(eid2), getRef());
+							expectMsgEquals(new TaskOperationResult(eid2, true));
 
-						if (t1 != null) {
-							if (t1.getExecutionState() == ExecutionState.RUNNING) {
-								tm.tell(new CancelTask(eid1), getRef());
-								expectMsgEquals(new TaskOperationResult(eid1, true));
+							if (t2 != null) {
+								Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+										timeout);
+								Await.ready(response, d);
 							}
-							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-									timeout);
-							Await.ready(response, d);
-						}
 
-						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-						tasks = expectMsgClass(TestingTaskManagerMessages
-								.ResponseRunningTasks.class).asJava();
+							if (t1 != null) {
+								if (t1.getExecutionState() == ExecutionState.RUNNING) {
+									tm.tell(new CancelTask(eid1), getRef());
+									expectMsgEquals(new TaskOperationResult(eid1, true));
+								}
+								Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+										timeout);
+								Await.ready(response, d);
+							}
 
-						assertEquals(0, tasks.size());
-					}catch(Exception e){
-						e.printStackTrace();
-						fail(e.getMessage());
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+							tasks = expectMsgClass(TestingTaskManagerMessages
+									.ResponseRunningTasks.class).asJava();
+
+							assertEquals(0, tasks.size());
+						}
+						catch(Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
 					}
+				};
+			}
+			catch(Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+			finally {
+				// shut down the actors
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
 				}
-			};
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
 		}};
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	public static class SimpleJobManager extends UntypedActor{
+	public static class SimpleJobManager extends UntypedActor {
 
 		@Override
 		public void onReceive(Object message) throws Exception {
-			if(message instanceof RegistrationMessages.RegisterTaskManager){
+			if (message instanceof RegistrationMessages.RegisterTaskManager) {
 				final InstanceID iid = new InstanceID();
-				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1,
-								Option.<ActorRef>apply(null)),
-						getSelf());
-			}else if(message instanceof JobManagerMessages.UpdateTaskExecutionState){
+				final ActorRef self = getSelf();
+				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+			}
+			else if(message instanceof TaskMessages.UpdateTaskExecutionState){
 				getSender().tell(true, getSelf());
 			}
 		}
@@ -514,7 +590,7 @@ public class TaskManagerTest {
 
 		@Override
 		public void onReceive(Object message) throws Exception{
-			if (message instanceof JobManagerMessages.UpdateTaskExecutionState) {
+			if (message instanceof TaskMessages.UpdateTaskExecutionState) {
 				getSender().tell(false, getSelf());
 			} else {
 				super.onReceive(message);
@@ -538,14 +614,24 @@ public class TaskManagerTest {
 		}
 	}
 
-	public static ActorRef createTaskManager(ActorRef jm) {
-		Configuration cfg = new Configuration();
-		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+	public static ActorRef createTaskManager(ActorRef jobManager) {
+		ActorRef taskManager = null;
+		try {
+			Configuration cfg = new Configuration();
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
 
-		String jobManagerURL = jm.path().toString();
+			Option<String> jobMangerUrl = Option.apply(jobManager.path().toString());
 
-		ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
-				jobManagerURL, cfg, system);
+			taskManager = TaskManager.startTaskManagerComponentsAndActor(
+					cfg, system, "localhost",
+					Option.<String>empty(),
+					jobMangerUrl,
+					true, TestingTaskManager.class);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Could not create test TaskManager: " + e.getMessage());
+		}
 
 		Future<Object> response = Patterns.ask(taskManager, 
 				TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
@@ -554,8 +640,9 @@ public class TaskManagerTest {
 			FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
 			Await.ready(response, d);
 		}
-		catch(Exception e) {
-			throw new RuntimeException("Exception while waiting for the task manager registration.", e);
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Exception while waiting for the task manager registration: " + e.getMessage());
 		}
 
 		return taskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 689b22d..3a8fcd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -46,7 +46,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 9c329d1..8c8ce06 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -31,14 +31,14 @@ import org.junit.Test
 
 import scala.concurrent.duration.Duration
 
+/**
+ * Tests that a lookup of a local JobManager fails within a given timeout if the JobManager
+ * actor is not reachable.
+ */
 class JobManagerConnectionTest {
 
   private val timeout = 1000
 
-  /**
-   * Tests that a lookup of a local JobManager fails within a given timeout if the JobManager
-   * actor is not reachable.
-   */
   @Test
   def testResolveUnreachableActorLocalHost() : Unit = {
     // startup a test actor system listening at an arbitrary address
@@ -54,7 +54,7 @@ class JobManagerConnectionTest {
       }
 
       val endpoint = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), freePort)
-      val config = getConfigWithLowTimeout()
+      val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
         () => {
@@ -90,7 +90,7 @@ class JobManagerConnectionTest {
     try {
       // some address that is not running a JobManager
       val endpoint = new InetSocketAddress(InetAddress.getByName("10.254.254.254"), 2)
-      val config = getConfigWithLowTimeout()
+      val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
         () => {
@@ -114,7 +114,7 @@ class JobManagerConnectionTest {
     }
   }
 
-  private def getConfigWithLowTimeout() : Configuration = {
+  private def createConfigWithLowTimeout() : Configuration = {
     val config = new Configuration()
     config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT,
                      Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s")

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
deleted file mode 100644
index 58905ef..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import java.net.InetAddress
-
-import akka.actor._
-import akka.testkit.{TestKit, ImplicitSender}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
-RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import scala.concurrent.duration._
-
-import scala.language.postfixOps
-
-class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with
-ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The JobManager" should {
-    "notify already registered TaskManagers" in {
-
-      val jm = TestingUtils.startTestingJobManager(_system)
-
-      val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
-      val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-      try {
-        within(TestingUtils.TESTING_DURATION) {
-          jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
-          jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
-
-          expectMsgType[AcknowledgeRegistration]
-          expectMsgType[AlreadyRegistered]
-        }
-      } finally {
-        jm ! Kill
-      }
-    }
-  }
-
-  "The TaskManager" should {
-    "shutdown if its registration is refused by the JobManager" in {
-
-      val tm = TestingUtils.startTestingTaskManager(self, _system)
-
-      watch(tm)
-
-      try{
-        within(TestingUtils.TESTING_DURATION) {
-          expectMsgType[RegisterTaskManager]
-          tm ! RefuseRegistration("Testing connection refusal")
-
-          expectTerminated(tm)
-        }
-      }
-    }
-
-    "ignore RefuseRegistration messages after it has been successfully registered" in {
-
-      val tm = TestingUtils.startTestingTaskManager(self, _system)
-
-      try {
-        ignoreMsg{
-          case _: Heartbeat => true
-        }
-        within(TestingUtils.TESTING_DURATION) {
-          expectMsgType[RegisterTaskManager]
-
-          tm ! AcknowledgeRegistration(new InstanceID(), 42, None)
-
-          tm ! RefuseRegistration("Should be ignored")
-
-          // Check if the TaskManager is still alive
-          tm ! Identify(1)
-
-          expectMsgType[ActorIdentity]
-
-        }
-      } finally {
-        tm ! Kill
-      }
-    }
-
-    "shutdown after the maximum registration duration has been exceeded" in {
-
-      val config = new Configuration()
-      config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second")
-
-      val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
-        self.path.toString, config, _system)
-
-      watch(tm)
-
-      try {
-        ignoreMsg{
-          case _: RegisterTaskManager => true
-        }
-        within(2 seconds) {
-          expectTerminated(tm)
-        }
-      } finally {
-        tm ! Kill
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
new file mode 100644
index 0000000..409e98d
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.net.InetAddress
+
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
+import org.junit.Assert.{assertNotEquals, assertNotNull}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Tests for the JobManager's behavior when a TaskManager solicits registration.
+ * It also tests the JobManager's response to heartbeats from TaskManagers it does
+ * not know.
+ */
+class TaskManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The JobManager" should {
+
+    "assign a TaskManager a unique instance ID" in {
+      val jm = startTestingJobManager(_system)
+
+      val tmDummy1 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+      val tmDummy2 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+
+      try {
+        val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
+        val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+
+        val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+        var id1: InstanceID = null
+        var id2: InstanceID = null
+
+        // task manager 1
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy1, connectionInfo1, hardwareDescription, 1)
+
+          val response = receiveOne(1 second)
+          response match {
+            case AcknowledgeRegistration(_, id, _) => id1 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
+
+        // task manager 2
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy2, connectionInfo2, hardwareDescription, 1)
+
+          val response = receiveOne(1 second)
+          response match {
+            case AcknowledgeRegistration(_, id, _) => id2 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
+
+        assertNotNull(id1)
+        assertNotNull(id2)
+        assertNotEquals(id1, id2)
+      }
+      finally {
+        tmDummy1 ! Kill
+        tmDummy2 ! Kill
+        jm ! Kill
+      }
+    }
+
+    "handle repeated registration calls" in {
+
+      val jm = startTestingJobManager(_system)
+      val tmDummy = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+
+      try {
+        val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+        val hardwareDescription = HardwareDescription.extractFromSystem(10)
+        
+        within(1 second) {
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+          jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+
+          expectMsgType[AcknowledgeRegistration]
+          expectMsgType[AlreadyRegistered]
+          expectMsgType[AlreadyRegistered]
+        }
+      } finally {
+        tmDummy ! Kill
+        jm ! Kill
+      }
+    }
+  }
+
+  private def startTestingJobManager(system: ActorSystem): ActorRef = {
+    val (jm: ActorRef, _) = JobManager.startJobManagerActors(
+                                        new Configuration(), _system, None, None)
+    jm
+  }
+}
+
+object TaskManagerRegistrationTest {
+
+  /** Simply dummy actor that swallows all messages */
+  class DummyActor extends Actor {
+    override def receive: Receive = {
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 4a72694..f87e151 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -67,7 +67,17 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
 
     val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
 
-    TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName,
-      singleActorSystem, numTaskManagers == 1, classOf[TestingTaskManager])
+    val jobManagerPath: Option[String] = if (singleActorSystem) {
+      Some(jobManagerActor.path.toString)
+    } else {
+      None
+    }
+
+    TaskManager.startTaskManagerComponentsAndActor(configuration, system,
+                                                   HOSTNAME,
+                                                   Some(tmActorName),
+                                                   jobManagerPath,
+                                                   numTaskManagers == 1,
+                                                   classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index a47e4e7..702e34c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -22,9 +22,12 @@ import akka.actor.{Terminated, ActorRef}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.Messages.Disconnect
-import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
-import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
+import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask
+import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
@@ -35,11 +38,15 @@ import scala.language.postfixOps
 /**
  * Subclass of the [[TaskManager]] to support testing messages
  */
-class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
+class TestingTaskManager(config: TaskManagerConfiguration,
+                         connectionInfo: InstanceConnectionInfo,
                          jobManagerAkkaURL: String,
-                         taskManagerConfig: TaskManagerConfiguration,
-                         networkConfig: NetworkEnvironmentConfiguration)
-  extends TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConfig) {
+                         memoryManager: DefaultMemoryManager,
+                         ioManager: IOManager,
+                         network: NetworkEnvironment,
+                         numberOfSlots: Int)
+  extends TaskManager(config, connectionInfo, jobManagerAkkaURL,
+                      memoryManager, ioManager, network, numberOfSlots) {
 
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
@@ -81,12 +88,12 @@ class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
         bcVarManager.getNumberOfVariablesWithReferences)
 
     case RequestNumActiveConnections =>
-      networkEnvironment match {
-        case Some(ne) => sender ! ResponseNumActiveConnections(
-          ne.getConnectionManager.getNumberOfActiveConnections)
-
-        case None => sender ! ResponseNumActiveConnections(0)
-      }
+      val numActive = if (network.isAssociated) {
+                        network.getConnectionManager.getNumberOfActiveConnections
+                      } else {
+                        0
+                      }
+      sender ! ResponseNumActiveConnections(numActive)
 
     case NotifyWhenJobRemoved(jobID) =>
       if(runningTasks.values.exists(_.getJobID == jobID)){
@@ -129,7 +136,7 @@ class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
       if (!disconnectDisabled) {
         super.receiveWithLogMessages(msg)
 
-        val jobManager = sender
+        val jobManager = sender()
 
         waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
           _ foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 9bb3d0b..63dce31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import com.typesafe.config.ConfigFactory
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import scala.concurrent.duration._
 
@@ -57,34 +56,18 @@ object TestingUtils {
 
   def getDefaultTestingActorSystemConfig = testConfig
 
-  def startTestingJobManager(system: ActorSystem): ActorRef = {
-    val config = new Configuration()
-
-    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ ,
-        executionRetries, delayBetweenRetries,
-        timeout, archiveCount) = JobManager.createJobManagerComponents(config)
-
-    val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
-    val archive = system.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
-    val jobManagerProps = Props(new JobManager(config, instanceManager, scheduler,
-      libraryCacheManager, archive, accumulatorManager, None, executionRetries,
-      delayBetweenRetries, timeout) with TestingJobManager)
-
-    system.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
-  }
 
   def startTestingTaskManagerWithConfiguration(hostname: String,
                                                jobManagerURL: String,
                                                config: Configuration,
-                                               system: ActorSystem) = {
+                                               system: ActorSystem) : ActorRef = {
 
-    val (tmConfig, netConfig, connectionInfo, _) =
-      TaskManager.parseTaskManagerConfiguration(config, hostname, true, false)
 
-    val tmProps = Props(classOf[TestingTaskManager], connectionInfo,
-                        jobManagerURL, tmConfig, netConfig)
-    system.actorOf(tmProps)
+    TaskManager.startTaskManagerComponentsAndActor(config, system,
+                                                   hostname,
+                                                   None, // random actor name
+                                                   Some(jobManagerURL), // job manager
+                                                   true, classOf[TestingTaskManager])
   }
 
   def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
@@ -92,11 +75,7 @@ object TestingUtils {
     val jmURL = jobManager.path.toString
     val config = new Configuration()
 
-    val (tmConfig, netConfig, connectionInfo, _) =
-      TaskManager.parseTaskManagerConfiguration(config,  "localhost", true, true)
-
-    val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig)
-    system.actorOf(tmProps)
+    startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
   }
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 2641bc1..5b80531 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -26,8 +26,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.jobmanager.BarrierAck;
-import org.apache.flink.runtime.jobmanager.StateBarrierAck;
+import org.apache.flink.runtime.messages.CheckpointingMessages;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.state.StateHandle;
@@ -116,12 +115,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 
 		if (configuration.getStateMonitoring() && !states.isEmpty()) {
 			getEnvironment().getJobManager().tell(
-					new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+					new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
 							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
 							new LocalStateHandle(states)), ActorRef.noSender());
 		} else {
 			getEnvironment().getJobManager().tell(
-					new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+					new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
 							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 6df8099..ddfffee 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -110,8 +110,14 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
 
     val localExecution = numTaskManagers == 1
 
-    TaskManager.startTaskManagerActor(config, system, HOSTNAME,
-        TaskManager.TASK_MANAGER_NAME + index, singleActorSystem, localExecution,
+    val jobManagerAkkaUrl: Option[String] = if (singleActorSystem) {
+      Some(jobManagerActor.path.toString)
+    } else {
+      None
+    }
+
+    TaskManager.startTaskManagerComponentsAndActor(config, system, HOSTNAME,
+        Some(TaskManager.TASK_MANAGER_NAME + index), jobManagerAkkaUrl, localExecution,
          classOf[TestingTaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 4311e9c..2901bf8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -139,7 +139,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
 
 			// we wait for the JobManager to have the two TaskManagers available
 			// wait for at most 20 seconds
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 30000);
 
 			// the program will set a marker file in each of its parallel tasks once they are ready, so that
 			// this coordinating code is aware of this.
@@ -174,7 +174,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
 			new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
 
 			// we wait for the third TaskManager to register (20 seconds max)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 30000);
 
 			// kill one of the previous TaskManagers, triggering a failure and recovery
 			taskManagerProcess1.destroy();
@@ -369,7 +369,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
-				TaskManager.runTaskManager(cfg, TaskManager.class);
+				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
 
 				// wait forever
 				Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 6fa7a61..3556ec1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -87,7 +87,7 @@ public class YarnTaskManagerRunner {
 			@Override
 			public Object run() {
 				try {
-					TaskManager.runTaskManager(configuration, YarnTaskManager.class);
+					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, YarnTaskManager.class);
 				}
 				catch (Throwable t) {
 					LOG.error("Error while starting the TaskManager", t);