You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:35 UTC
[2/7] flink git commit: [FLINK-1580] [taskmanager] Improve
TaskManager startup robustness
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 19d55f7..05ede99 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -20,13 +20,12 @@ package org.apache.flink.runtime.taskmanager
import org.apache.flink.configuration.Configuration
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
-case class TaskManagerConfiguration(numberOfSlots: Int,
- memorySize: Long, pageSize: Int,
- tmpDirPaths: Array[String],
+
+case class TaskManagerConfiguration(tmpDirPaths: Array[String],
cleanupInterval: Long,
- profilingInterval: Option[Long],
timeout: FiniteDuration,
- maxRegistrationDuration: Duration,
+ maxRegistrationDuration: Option[FiniteDuration],
+ numberOfSlots: Int,
configuration: Configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5dd10c0..7380b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -43,8 +43,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -113,16 +115,16 @@ public class ExecutionGraphTestUtils {
public TaskDeploymentDescriptor lastTDD;
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof TaskManagerMessages.SubmitTask) {
- TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
+ if (msg instanceof SubmitTask) {
+ SubmitTask submitTask = (SubmitTask) msg;
lastTDD = submitTask.tasks();
getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(), true), getSelf());
- } else if (msg instanceof TaskManagerMessages.CancelTask) {
- TaskManagerMessages.CancelTask cancelTask = (TaskManagerMessages.CancelTask) msg;
+ } else if (msg instanceof CancelTask) {
+ CancelTask cancelTask = (CancelTask) msg;
getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
}
- else if (msg instanceof TaskManagerMessages.FailIntermediateResultPartitions) {
+ else if (msg instanceof FailIntermediateResultPartitions) {
getSender().tell(new Object(), getSelf());
}
}
@@ -133,13 +135,13 @@ public class ExecutionGraphTestUtils {
public static class SimpleFailingTaskManager extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof TaskManagerMessages.SubmitTask) {
- TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
+ if (msg instanceof SubmitTask) {
+ SubmitTask submitTask = (SubmitTask) msg;
getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(),
false, ERROR_MESSAGE), getSelf());
- } else if (msg instanceof TaskManagerMessages.CancelTask) {
- TaskManagerMessages.CancelTask cancelTask = (TaskManagerMessages.CancelTask) msg;
+ } else if (msg instanceof CancelTask) {
+ CancelTask cancelTask = (CancelTask) msg;
getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index e199353..226b256 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -644,10 +644,10 @@ public class ExecutionVertexCancelTest {
@Override
public void onReceive(Object message) throws Exception {
- if(message instanceof TaskManagerMessages.SubmitTask){
- TaskDeploymentDescriptor tdd = ((TaskManagerMessages.SubmitTask) message).tasks();
+ if(message instanceof TaskMessages.SubmitTask){
+ TaskDeploymentDescriptor tdd = ((TaskMessages.SubmitTask) message).tasks();
getSender().tell(new TaskOperationResult(tdd.getExecutionId(), true), getSelf());
- }else if(message instanceof TaskManagerMessages.CancelTask){
+ }else if(message instanceof TaskMessages.CancelTask){
index++;
if(index >= results.length){
getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf());
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 5713c10..7b72669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
new file mode 100644
index 0000000..1b4f5f3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/RegistrationTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RegistrationMessages.AcknowledgeRegistration;
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager;
+import org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * The tests in this class verify the behavior of the TaskManager
+ * when connecting to the JobManager, and when the JobManager
+ * is unreachable.
+ */
+public class RegistrationTest {
+
+ private static final Option<String> NONE_STRING = Option.empty();
+
+ // use one actor system throughout all tests
+ private static ActorSystem actorSystem;
+
+ @BeforeClass
+ public static void startActorSystem() {
+ Configuration config = new Configuration();
+ config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+ config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+ config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void shutdownActorSystem() {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+ }
+
+ /**
+ * A test that verifies that two TaskManagers correctly register at the
+ * JobManager.
+ */
+ @Test
+ public void testSimpleRegistration() {
+ new JavaTestKit(actorSystem) {{
+ try {
+ // a simple JobManager
+ ActorRef jobManager = startJobManager();
+
+ // start two TaskManagers. it will automatically try to register
+ final ActorRef taskManager1 = startTaskManager(jobManager);
+ final ActorRef taskManager2 = startTaskManager(jobManager);
+
+ // check that the TaskManagers are registered
+ Future<Object> responseFuture1 = Patterns.ask(
+ taskManager1,
+ TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ 5000);
+
+ Future<Object> responseFuture2 = Patterns.ask(
+ taskManager2,
+ TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ 5000);
+
+ Object response1 = Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
+ Object response2 = Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
+
+ // this is a hack to work around the way Java can interact with scala case objects
+ Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+ assertTrue(response1 != null && confirmClass.isAssignableFrom(response1.getClass()));
+ assertTrue(response2 != null && confirmClass.isAssignableFrom(response2.getClass()));
+
+ // check that the JobManager has 2 TaskManagers registered
+ Future<Object> numTaskManagersFuture = Patterns.ask(
+ jobManager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ 1000);
+
+ Integer count = (Integer) Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+ assertEquals(2, count.intValue());
+
+ stopActor(taskManager1);
+ stopActor(taskManager2);
+ stopActor(jobManager);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }};
+ }
+
+ /**
+ * A test that verifies that two TaskManagers correctly register at the
+ * JobManager.
+ */
+ @Test
+ public void testDelayedRegistration() {
+ new JavaTestKit(actorSystem) {{
+ try {
+ // start a TaskManager that tries to register at the JobManager before the JobManager is
+ // available. we give it the regular JobManager akka URL
+ final ActorRef taskManager = startTaskManager(JobManager.getLocalJobManagerAkkaURL(),
+ new Configuration());
+ // let it try for a bit
+ Thread.sleep(6000);
+
+ // now start the JobManager, with the regular akka URL
+ final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+
+ // check that the TaskManagers are registered
+ Future<Object> responseFuture = Patterns.ask(
+ taskManager,
+ TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ 30000);
+
+ Object response = Await.result(responseFuture, new FiniteDuration(30, TimeUnit.SECONDS));
+
+ // this is a hack to work around the way Java can interact with scala case objects
+ Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
+ assertTrue(response != null && confirmClass.isAssignableFrom(response.getClass()));
+
+ stopActor(taskManager);
+ stopActor(jobManager);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }};
+ }
+
+ /**
+ * Tests that the TaskManager shuts down when it cannot register at the
+ * JobManager within the given maximum duration.
+ *
+ * Unfortunately, this test does not give good error messages.
+ * (I have not figured out how to get any better message out of the
+ * Akka TestKit than "ask timeout exception".)
+ *
+ * Anyways: An "ask timeout exception" here means that the TaskManager
+ * did not shut down after its registration timeout expired.
+ */
+ @Test
+ public void testShutdownAfterRegistrationDurationExpired() {
+ new JavaTestKit(actorSystem) {{
+ try {
+ // registration timeout of 1 second
+ Configuration tmConfig = new Configuration();
+ tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms");
+
+ // start the taskManager actor
+ final ActorRef taskManager = startTaskManager(JobManager.getLocalJobManagerAkkaURL(), tmConfig);
+
+ // make sure it terminates in time, since it cannot register at a JobManager
+ watch(taskManager);
+ new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+
+ @Override
+ protected void run() {
+ expectTerminated(taskManager);
+ }
+ };
+
+ stopActor(taskManager);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }};
+ }
+
+ /**
+ * Make sure that the TaskManager keeps trying to register, even after
+ * registration attempts have been refused.
+ */
+ @Test
+ public void testTaskManagerResumesConnectAfterRefusedRegistration() {
+ new JavaTestKit(actorSystem) {{
+ try {
+ // we make the test actor (the test kit) the JobManager to intercept
+ // the messages
+ final ActorRef taskManager = startTaskManager(getTestActor());
+
+ // check and decline initial registration
+ new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+ @Override
+ protected void run() {
+ // the TaskManager should try to register
+ expectMsgClass(RegisterTaskManager.class);
+
+ // we decline the registration
+ getLastSender().tell(new RefuseRegistration("test reason"), getTestActor());
+ }
+ };
+
+ // the TaskManager should wait a bit an retry...
+ FiniteDuration maxDelay = (FiniteDuration) TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
+ new Within(maxDelay) {
+
+ @Override
+ protected void run() {
+ expectMsgClass(RegisterTaskManager.class);
+ }
+ };
+
+ stopActor(taskManager);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }};
+ }
+
+ /**
+ * Validate that the TaskManager attempts to re-connect after it lost the connection
+ * to the JobManager.
+ */
+ @Test
+ public void testTaskManagerResumesConnectAfterJobManagerFailure() {
+ new JavaTestKit(actorSystem) {{
+ try {
+ final Props fakeJmProps = Props.create(ForwardingActor.class, getTestActor());
+ final String jobManagerName = "FAKE_JOB_MANAGER";
+
+ final ActorRef fakeJobManager1 = actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+
+ // we make the test actor (the test kit) the JobManager to intercept
+ // the messages
+ final ActorRef taskManager = startTaskManager(fakeJobManager1);
+
+ // validate initial registration
+ new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+ @Override
+ protected void run() {
+ // the TaskManager should try to register
+ expectMsgClass(RegisterTaskManager.class);
+
+ // we accept the registration
+ taskManager.tell(new AcknowledgeRegistration(fakeJobManager1, new InstanceID(), 45234),
+ fakeJobManager1);
+ }
+ };
+
+ // kill the first forwarding JobManager
+ watch(fakeJobManager1);
+ stopActor(fakeJobManager1);
+
+ // wait for the killing to be completed
+ new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+
+ @Override
+ protected void run() {
+ expectTerminated(fakeJobManager1);
+ }
+ };
+
+ // now start the second fake JobManager and expect that
+ // the TaskManager registers again
+ // the second fake JM needs to have the same actor URL
+ final ActorRef fakeJobManager2 = actorSystem.actorOf(fakeJmProps, jobManagerName);
+
+ // expect the next registration
+ new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+
+ @Override
+ protected void run() {
+ expectMsgClass(RegisterTaskManager.class);
+
+ // we accept the registration
+ taskManager.tell(new AcknowledgeRegistration(fakeJobManager2, new InstanceID(), 45234),
+ fakeJobManager2);
+ }
+ };
+
+ stopActor(taskManager);
+ stopActor(fakeJobManager2);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }};
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utility Functions
+ // --------------------------------------------------------------------------------------------
+
+ private static ActorRef startJobManager() throws Exception {
+ // start the actors. don't give names, so they get generated names and we
+ // avoid conflicts with the actor names
+ return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+ }
+
+ private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
+ return startTaskManager(jobManager.path().toString(), new Configuration());
+ }
+
+ private static ActorRef startTaskManager(String jobManagerUrl, Configuration config) throws Exception {
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+
+ return TaskManager.startTaskManagerComponentsAndActor(
+ config, actorSystem, "localhost",
+ NONE_STRING, // no actor name -> random
+ new Some<String>(jobManagerUrl), // job manager path
+ true, // local network stack only
+ TaskManager.class);
+ }
+
+ private static void stopActor(ActorRef actor) {
+ actor.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utility Actor that only forwards messages
+ // --------------------------------------------------------------------------------------------
+
+ public static class ForwardingActor extends UntypedActor {
+
+ private final ActorRef target;
+
+ public ForwardingActor(ActorRef target) {
+ this.target = target;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ target.forward(message, context());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
new file mode 100644
index 0000000..131549b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+public class TaskManagerComponentsStartupShutdownTest {
+
+ /**
+ * Makes sure that all components are shut down when the TaskManager
+ * actor is shut down.
+ */
+ @Test
+ public void testComponentsStartupShutdown() {
+
+ final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
+ final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+ final int BUFFER_SIZE = 32 * 1024;
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 s");
+ config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
+
+ ActorSystem actorSystem = null;
+ try {
+ actorSystem = AkkaUtils.createLocalActorSystem(config);
+
+ final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+
+ // create the components for the TaskManager manually
+ final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
+ TMP_DIR,
+ 1000000,
+ timeout,
+ Option.<FiniteDuration>empty(),
+ 1,
+ config);
+
+ final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
+ 32, BUFFER_SIZE, IOManager.IOMode.SYNC, Option.<NettyConfig>empty());
+
+ final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
+
+ final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE);
+ final IOManager ioManager = new IOManagerAsync(TMP_DIR);
+ final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
+ final int numberOfSlots = 1;
+
+ // create the task manager
+ final Props tmProps = Props.create(TaskManager.class,
+ tmConfig, connectionInfo, jobManager.path().toString(),
+ memManager, ioManager, network, numberOfSlots);
+
+ final ActorRef taskManager = actorSystem.actorOf(tmProps);
+
+ new JavaTestKit(actorSystem) {{
+
+ // wait for the TaskManager to be registered
+ new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+ @Override
+ protected void run() {
+ taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ getTestActor());
+
+ expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+ }
+ };
+ }};
+
+ // the components should now all be initialized
+ assertTrue(network.isAssociated());
+
+ // shut down all actors and the actor system
+ // Kill the Task down the JobManager
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+
+ // shut down the actors and the actor system
+ actorSystem.shutdown();
+ actorSystem.awaitTermination();
+ actorSystem = null;
+
+ // now that the TaskManager is shut down, the components should be shut down as well
+ assertFalse(network.isAssociated());
+ assertTrue(network.isShutdown());
+ assertTrue(ioManager.isProperlyShutDown());
+ assertTrue(memManager.isShutdown());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
new file mode 100644
index 0000000..d243fbf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.junit.Test;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Validates that the TaskManager startup properly obeys the configuration
+ * values.
+ */
+public class TaskManagerConfigurationTest {
+
+ @Test
+ public void testUsePreconfiguredNetworkInterface() {
+ try {
+ final String TEST_HOST_NAME = "testhostname";
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
+
+ Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+
+ // validate the configured test host name
+ assertEquals(TEST_HOST_NAME, address._1());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testActorSystemPortConfig() {
+ try {
+ // config with pre-configured hostname to speed up tests (no interface selection)
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+
+ // auto port
+ assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891)._2());
+
+ // pre-defined port
+ final int testPort = 22551;
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
+ assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891)._2());
+
+ // invalid port
+ try {
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
+ TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+ fail("should fail with an exception");
+ }
+ catch (IllegalConfigurationException e) {
+ // bam!
+ }
+
+ // invalid port
+ try {
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
+ TaskManager.selectNetworkInterfaceAndPort(config, "localhost", 7891);
+ fail("should fail with an exception");
+ }
+ catch (IllegalConfigurationException e) {
+ // bam!
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNetworkInterfaceSelection() {
+ ServerSocket server;
+ String hostname = "localhost";
+
+ try {
+ InetAddress localhostAddress = InetAddress.getByName(hostname);
+ server = new ServerSocket(0, 50, localhostAddress);
+ }
+ catch (UnknownHostException e) {
+ // may happen if disconnected. skip test.
+ System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
+ return;
+ }
+ catch (IOException e) {
+ // may happen in certain test setups, skip test.
+ System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
+ return;
+ }
+
+ try {
+ // open a server port to allow the system to connect
+ Configuration config = new Configuration();
+
+ assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, hostname, server.getLocalPort())._1());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ try {
+ server.close();
+ } catch (IOException e) {
+ // ignore shutdown errors
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index f4ee52f..3e65916 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -139,7 +139,7 @@ public class TaskManagerProcessReapingTest {
// wait for max 5 seconds for the process to terminate
{
long now = System.currentTimeMillis();
- long deadline = now + 5000;
+ long deadline = now + 10000;
while (now < deadline && isProcessAlive(taskManagerProcess)) {
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 640ccc3..e736a55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
@@ -48,14 +49,17 @@ import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -93,10 +97,12 @@ public class TaskManagerTest {
@Test
public void testSetupTaskManager() {
new JavaTestKit(system){{
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
try {
- ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class));
+ jobManager = system.actorOf(Props.create(SimpleJobManager.class));
- final ActorRef tm = createTaskManager(jobManager);
+ taskManager = createTaskManager(jobManager);
JobID jid = new JobID();
JobVertexID vid = new JobVertexID();
@@ -108,11 +114,12 @@ public class TaskManagerTest {
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
- new Within(duration("1 seconds")){
+ final ActorRef tmClosure = taskManager;
+ new Within(duration("2 seconds")) {
@Override
protected void run() {
- tm.tell(new SubmitTask(tdd), getRef());
+ tmClosure.tell(new SubmitTask(tdd), getRef());
expectMsgEquals(new TaskOperationResult(eid, true));
}
};
@@ -121,16 +128,27 @@ public class TaskManagerTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ // shut down the actors
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
}};
}
@Test
public void testJobSubmissionAndCanceling() {
new JavaTestKit(system){{
- try {
- ActorRef jobManager = system.actorOf(Props.create(SimpleJobManager.class));
- final ActorRef tm = createTaskManager(jobManager);
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+ try {
+ jobManager = system.actorOf(Props.create(SimpleJobManager.class));
+ taskManager = createTaskManager(jobManager);
final JobID jid1 = new JobID();
final JobID jid2 = new JobID();
@@ -153,6 +171,7 @@ public class TaskManagerTest {
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), 0);
+ final ActorRef tm = taskManager;
final FiniteDuration d = duration("1 second");
new Within(d) {
@@ -220,21 +239,34 @@ public class TaskManagerTest {
}
}
};
- }catch(Exception e){
+ }
+ catch(Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
-
+ finally {
+ // shut down the actors
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
}};
}
@Test
public void testGateChannelEdgeMismatch() {
new JavaTestKit(system){{
+
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
try {
- ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class));
+ jobManager = system.actorOf(Props.create(SimpleJobManager.class));
- final ActorRef tm = createTaskManager(jm);
+ taskManager = createTaskManager(jobManager);
+ final ActorRef tm = taskManager;
final JobID jid = new JobID();
@@ -260,127 +292,152 @@ public class TaskManagerTest {
@Override
protected void run() {
- try {
- tm.tell(new SubmitTask(tdd1), getRef());
- tm.tell(new SubmitTask(tdd2), getRef());
- TaskOperationResult result = expectMsgClass(TaskOperationResult.class);
- assertFalse(result.success());
- assertEquals(eid1, result.executionID());
+ tm.tell(new SubmitTask(tdd1), getRef());
+ tm.tell(new SubmitTask(tdd2), getRef());
- result = expectMsgClass(TaskOperationResult.class);
- assertFalse(result.success());
- assertEquals(eid2, result.executionID());
+ TaskOperationResult result = expectMsgClass(TaskOperationResult.class);
+ assertFalse(result.success());
+ assertEquals(eid1, result.executionID());
- tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
- getRef());
- tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
- getRef());
+ result = expectMsgClass(TaskOperationResult.class);
+ assertFalse(result.success());
+ assertEquals(eid2, result.executionID());
- expectMsgEquals(true);
- expectMsgEquals(true);
+ tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+ getRef());
+ tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+ getRef());
- tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
- Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
- .ResponseRunningTasks.class).asJava();
+ expectMsgEquals(true);
+ expectMsgEquals(true);
- assertEquals(0, tasks.size());
- }catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
+ .ResponseRunningTasks.class).asJava();
+
+ assertEquals(0, tasks.size());
}
};
- }catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ // shut down the actors
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
}};
}
@Test
public void testRunJobWithForwardChannel() {
new JavaTestKit(system){{
- final JobID jid = new JobID();
- JobVertexID vid1 = new JobVertexID();
- JobVertexID vid2 = new JobVertexID();
-
- final ExecutionAttemptID eid1 = new ExecutionAttemptID();
- final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+ try {
+ final JobID jid = new JobID();
- ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
- final ActorRef tm = createTaskManager(jm);
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
- IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+ final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+ final ExecutionAttemptID eid2 = new ExecutionAttemptID();
- List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+ jobManager = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
- InputGateDeploymentDescriptor ircdd =
- new InputGateDeploymentDescriptor(
- new IntermediateDataSetID(),
- 0, new InputChannelDeploymentDescriptor[]{
- new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
- }
- );
+ taskManager = createTaskManager(jobManager);
+ final ActorRef tm = taskManager;
- final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
- new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
+ IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
- final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
- new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.singletonList(ircdd),
- new ArrayList<BlobKey>(), 0);
+ List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
- final FiniteDuration d = duration("1 second");
+ InputGateDeploymentDescriptor ircdd =
+ new InputGateDeploymentDescriptor(
+ new IntermediateDataSetID(),
+ 0, new InputChannelDeploymentDescriptor[]{
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+ }
+ );
- new Within(d) {
+ final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+ new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+ irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
- @Override
- protected void run() {
- try {
- tm.tell(new SubmitTask(tdd1), getRef());
- expectMsgEquals(new TaskOperationResult(eid1, true));
- tm.tell(new SubmitTask(tdd2), getRef());
- expectMsgEquals(new TaskOperationResult(eid2, true));
+ final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+ new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.singletonList(ircdd),
+ new ArrayList<BlobKey>(), 0);
- tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
- Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
- .class).asJava();
+ final FiniteDuration d = duration("1 second");
- Task t1 = tasks.get(eid1);
- Task t2 = tasks.get(eid2);
+ new Within(d) {
- // wait until the tasks are done. rare thread races may cause the tasks to be done before
- // we get to the check, so we need to guard the check
- if (t1 != null) {
- Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
- timeout);
- Await.ready(response, d);
- }
+ @Override
+ protected void run() {
+ try {
+ tm.tell(new SubmitTask(tdd1), getRef());
+ expectMsgEquals(new TaskOperationResult(eid1, true));
+ tm.tell(new SubmitTask(tdd2), getRef());
+ expectMsgEquals(new TaskOperationResult(eid2, true));
- if (t2 != null) {
- Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
- timeout);
- Await.ready(response, d);
- assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
- }
+ tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
+ .class).asJava();
+
+ Task t1 = tasks.get(eid1);
+ Task t2 = tasks.get(eid2);
+
+ // wait until the tasks are done. rare thread races may cause the tasks to be done before
+ // we get to the check, so we need to guard the check
+ if (t1 != null) {
+ Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+ timeout);
+ Await.ready(response, d);
+ }
- tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
- tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
- .class).asJava();
+ if (t2 != null) {
+ Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+ timeout);
+ Await.ready(response, d);
+ assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
+ }
- assertEquals(0, tasks.size());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
+ .class).asJava();
+ assertEquals(0, tasks.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+ };
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ // shut down the actors
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
}
- };
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
}};
}
@@ -390,109 +447,128 @@ public class TaskManagerTest {
// this tests creates two tasks. the sender sends data, and fails to send the
// state update back to the job manager
// the second one blocks to be canceled
-
new JavaTestKit(system){{
- final JobID jid = new JobID();
- JobVertexID vid1 = new JobVertexID();
- JobVertexID vid2 = new JobVertexID();
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+ try {
+ final JobID jid = new JobID();
- final ExecutionAttemptID eid1 = new ExecutionAttemptID();
- final ExecutionAttemptID eid2 = new ExecutionAttemptID();
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
- ActorRef jm = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
- final ActorRef tm = createTaskManager(jm);
+ final ExecutionAttemptID eid1 = new ExecutionAttemptID();
+ final ExecutionAttemptID eid2 = new ExecutionAttemptID();
- IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+ jobManager = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
+ taskManager = createTaskManager(jobManager);
+ final ActorRef tm = taskManager;
- List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+ IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
- InputGateDeploymentDescriptor ircdd =
- new InputGateDeploymentDescriptor(
- new IntermediateDataSetID(),
- 0, new InputChannelDeploymentDescriptor[]{
- new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
- }
- );
+ List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
- final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
- new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
- irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), 0);
+ InputGateDeploymentDescriptor ircdd =
+ new InputGateDeploymentDescriptor(
+ new IntermediateDataSetID(),
+ 0, new InputChannelDeploymentDescriptor[]{
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
+ }
+ );
- final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
- new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.singletonList(ircdd),
- new ArrayList<BlobKey>(), 0);
+ final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+ new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+ irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
+ new ArrayList<BlobKey>(), 0);
- final FiniteDuration d = duration("1 second");
+ final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+ new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.singletonList(ircdd),
+ new ArrayList<BlobKey>(), 0);
- new Within(d){
+ final FiniteDuration d = duration("1 second");
- @Override
- protected void run() {
- try {
- // deploy sender before receiver, so the target is online when the sender requests the connection info
- tm.tell(new SubmitTask(tdd2), getRef());
- tm.tell(new SubmitTask(tdd1), getRef());
+ new Within(d){
- expectMsgEquals(new TaskOperationResult(eid2, true));
- expectMsgEquals(new TaskOperationResult(eid1, true));
+ @Override
+ protected void run() {
+ try {
+ // deploy sender before receiver, so the target is online when the sender requests the connection info
+ tm.tell(new SubmitTask(tdd2), getRef());
+ tm.tell(new SubmitTask(tdd1), getRef());
- tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
- Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
- .ResponseRunningTasks.class).asJava();
+ expectMsgEquals(new TaskOperationResult(eid2, true));
+ expectMsgEquals(new TaskOperationResult(eid1, true));
- Task t1 = tasks.get(eid1);
- Task t2 = tasks.get(eid2);
+ tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
+ .ResponseRunningTasks.class).asJava();
- tm.tell(new CancelTask(eid2), getRef());
- expectMsgEquals(new TaskOperationResult(eid2, true));
+ Task t1 = tasks.get(eid1);
+ Task t2 = tasks.get(eid2);
- if (t2 != null) {
- Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
- timeout);
- Await.ready(response, d);
- }
+ tm.tell(new CancelTask(eid2), getRef());
+ expectMsgEquals(new TaskOperationResult(eid2, true));
- if (t1 != null) {
- if (t1.getExecutionState() == ExecutionState.RUNNING) {
- tm.tell(new CancelTask(eid1), getRef());
- expectMsgEquals(new TaskOperationResult(eid1, true));
+ if (t2 != null) {
+ Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+ timeout);
+ Await.ready(response, d);
}
- Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
- timeout);
- Await.ready(response, d);
- }
- tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
- tasks = expectMsgClass(TestingTaskManagerMessages
- .ResponseRunningTasks.class).asJava();
+ if (t1 != null) {
+ if (t1.getExecutionState() == ExecutionState.RUNNING) {
+ tm.tell(new CancelTask(eid1), getRef());
+ expectMsgEquals(new TaskOperationResult(eid1, true));
+ }
+ Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+ timeout);
+ Await.ready(response, d);
+ }
- assertEquals(0, tasks.size());
- }catch(Exception e){
- e.printStackTrace();
- fail(e.getMessage());
+ tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ tasks = expectMsgClass(TestingTaskManagerMessages
+ .ResponseRunningTasks.class).asJava();
+
+ assertEquals(0, tasks.size());
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+ };
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ // shut down the actors
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
}
- };
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
}};
}
// --------------------------------------------------------------------------------------------
- public static class SimpleJobManager extends UntypedActor{
+ public static class SimpleJobManager extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
- if(message instanceof RegistrationMessages.RegisterTaskManager){
+ if (message instanceof RegistrationMessages.RegisterTaskManager) {
final InstanceID iid = new InstanceID();
- getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1,
- Option.<ActorRef>apply(null)),
- getSelf());
- }else if(message instanceof JobManagerMessages.UpdateTaskExecutionState){
+ final ActorRef self = getSelf();
+ getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+ }
+ else if(message instanceof TaskMessages.UpdateTaskExecutionState){
getSender().tell(true, getSelf());
}
}
@@ -514,7 +590,7 @@ public class TaskManagerTest {
@Override
public void onReceive(Object message) throws Exception{
- if (message instanceof JobManagerMessages.UpdateTaskExecutionState) {
+ if (message instanceof TaskMessages.UpdateTaskExecutionState) {
getSender().tell(false, getSelf());
} else {
super.onReceive(message);
@@ -538,14 +614,24 @@ public class TaskManagerTest {
}
}
- public static ActorRef createTaskManager(ActorRef jm) {
- Configuration cfg = new Configuration();
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+ public static ActorRef createTaskManager(ActorRef jobManager) {
+ ActorRef taskManager = null;
+ try {
+ Configuration cfg = new Configuration();
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
- String jobManagerURL = jm.path().toString();
+ Option<String> jobMangerUrl = Option.apply(jobManager.path().toString());
- ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
- jobManagerURL, cfg, system);
+ taskManager = TaskManager.startTaskManagerComponentsAndActor(
+ cfg, system, "localhost",
+ Option.<String>empty(),
+ jobMangerUrl,
+ true, TestingTaskManager.class);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Could not create test TaskManager: " + e.getMessage());
+ }
Future<Object> response = Patterns.ask(taskManager,
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
@@ -554,8 +640,9 @@ public class TaskManagerTest {
FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
Await.ready(response, d);
}
- catch(Exception e) {
- throw new RuntimeException("Exception while waiting for the task manager registration.", e);
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception while waiting for the task manager registration: " + e.getMessage());
}
return taskManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 689b22d..3a8fcd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -46,7 +46,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 9c329d1..8c8ce06 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -31,14 +31,14 @@ import org.junit.Test
import scala.concurrent.duration.Duration
+/**
+ * Tests that a lookup of a local JobManager fails within a given timeout if the JobManager
+ * actor is not reachable.
+ */
class JobManagerConnectionTest {
private val timeout = 1000
- /**
- * Tests that a lookup of a local JobManager fails within a given timeout if the JobManager
- * actor is not reachable.
- */
@Test
def testResolveUnreachableActorLocalHost() : Unit = {
// startup a test actor system listening at an arbitrary address
@@ -54,7 +54,7 @@ class JobManagerConnectionTest {
}
val endpoint = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), freePort)
- val config = getConfigWithLowTimeout()
+ val config = createConfigWithLowTimeout()
mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
() => {
@@ -90,7 +90,7 @@ class JobManagerConnectionTest {
try {
// some address that is not running a JobManager
val endpoint = new InetSocketAddress(InetAddress.getByName("10.254.254.254"), 2)
- val config = getConfigWithLowTimeout()
+ val config = createConfigWithLowTimeout()
mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
() => {
@@ -114,7 +114,7 @@ class JobManagerConnectionTest {
}
}
- private def getConfigWithLowTimeout() : Configuration = {
+ private def createConfigWithLowTimeout() : Configuration = {
val config = new Configuration()
config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT,
Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s")
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
deleted file mode 100644
index 58905ef..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import java.net.InetAddress
-
-import akka.actor._
-import akka.testkit.{TestKit, ImplicitSender}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
-RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingUtils
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import scala.concurrent.duration._
-
-import scala.language.postfixOps
-
-class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with
-ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
-
- def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
- override def afterAll(): Unit = {
- TestKit.shutdownActorSystem(system)
- }
-
- "The JobManager" should {
- "notify already registered TaskManagers" in {
-
- val jm = TestingUtils.startTestingJobManager(_system)
-
- val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
- val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
- jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
-
- expectMsgType[AcknowledgeRegistration]
- expectMsgType[AlreadyRegistered]
- }
- } finally {
- jm ! Kill
- }
- }
- }
-
- "The TaskManager" should {
- "shutdown if its registration is refused by the JobManager" in {
-
- val tm = TestingUtils.startTestingTaskManager(self, _system)
-
- watch(tm)
-
- try{
- within(TestingUtils.TESTING_DURATION) {
- expectMsgType[RegisterTaskManager]
- tm ! RefuseRegistration("Testing connection refusal")
-
- expectTerminated(tm)
- }
- }
- }
-
- "ignore RefuseRegistration messages after it has been successfully registered" in {
-
- val tm = TestingUtils.startTestingTaskManager(self, _system)
-
- try {
- ignoreMsg{
- case _: Heartbeat => true
- }
- within(TestingUtils.TESTING_DURATION) {
- expectMsgType[RegisterTaskManager]
-
- tm ! AcknowledgeRegistration(new InstanceID(), 42, None)
-
- tm ! RefuseRegistration("Should be ignored")
-
- // Check if the TaskManager is still alive
- tm ! Identify(1)
-
- expectMsgType[ActorIdentity]
-
- }
- } finally {
- tm ! Kill
- }
- }
-
- "shutdown after the maximum registration duration has been exceeded" in {
-
- val config = new Configuration()
- config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second")
-
- val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
- self.path.toString, config, _system)
-
- watch(tm)
-
- try {
- ignoreMsg{
- case _: RegisterTaskManager => true
- }
- within(2 seconds) {
- expectTerminated(tm)
- }
- } finally {
- tm ! Kill
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
new file mode 100644
index 0000000..409e98d
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.net.InetAddress
+
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
+import org.junit.Assert.{assertNotEquals, assertNotNull}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/**
+ * Tests for the JobManager's behavior when a TaskManager solicits registration.
+ * It also tests the JobManager's response to heartbeats from TaskManagers it does
+ * not know.
+ */
+class TaskManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+ def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "The JobManager" should {
+
+ "assign a TaskManager a unique instance ID" in {
+ val jm = startTestingJobManager(_system)
+
+ val tmDummy1 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+ val tmDummy2 = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+
+ try {
+ val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
+ val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+
+ val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+ var id1: InstanceID = null
+ var id2: InstanceID = null
+
+ // task manager 1
+ within(1 second) {
+ jm ! RegisterTaskManager(tmDummy1, connectionInfo1, hardwareDescription, 1)
+
+ val response = receiveOne(1 second)
+ response match {
+ case AcknowledgeRegistration(_, id, _) => id1 = id
+ case _ => fail("Wrong response message: " + response)
+ }
+ }
+
+ // task manager 2
+ within(1 second) {
+ jm ! RegisterTaskManager(tmDummy2, connectionInfo2, hardwareDescription, 1)
+
+ val response = receiveOne(1 second)
+ response match {
+ case AcknowledgeRegistration(_, id, _) => id2 = id
+ case _ => fail("Wrong response message: " + response)
+ }
+ }
+
+ assertNotNull(id1)
+ assertNotNull(id2)
+ assertNotEquals(id1, id2)
+ }
+ finally {
+ tmDummy1 ! Kill
+ tmDummy2 ! Kill
+ jm ! Kill
+ }
+ }
+
+ "handle repeated registration calls" in {
+
+ val jm = startTestingJobManager(_system)
+ val tmDummy = _system.actorOf(Props(classOf[TaskManagerRegistrationTest.DummyActor]))
+
+ try {
+ val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+ val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+ within(1 second) {
+ jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+ jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+ jm ! RegisterTaskManager(tmDummy, connectionInfo, hardwareDescription, 1)
+
+ expectMsgType[AcknowledgeRegistration]
+ expectMsgType[AlreadyRegistered]
+ expectMsgType[AlreadyRegistered]
+ }
+ } finally {
+ tmDummy ! Kill
+ jm ! Kill
+ }
+ }
+ }
+
+ private def startTestingJobManager(system: ActorSystem): ActorRef = {
+ val (jm: ActorRef, _) = JobManager.startJobManagerActors(
+ new Configuration(), _system, None, None)
+ jm
+ }
+}
+
+object TaskManagerRegistrationTest {
+
+ /** Simply dummy actor that swallows all messages */
+ class DummyActor extends Actor {
+ override def receive: Receive = {
+ case _ =>
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 4a72694..f87e151 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -67,7 +67,17 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
- TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName,
- singleActorSystem, numTaskManagers == 1, classOf[TestingTaskManager])
+ val jobManagerPath: Option[String] = if (singleActorSystem) {
+ Some(jobManagerActor.path.toString)
+ } else {
+ None
+ }
+
+ TaskManager.startTaskManagerComponentsAndActor(configuration, system,
+ HOSTNAME,
+ Some(tmActorName),
+ jobManagerPath,
+ numTaskManagers == 1,
+ classOf[TestingTaskManager])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index a47e4e7..702e34c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -22,9 +22,12 @@ import akka.actor.{Terminated, ActorRef}
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.runtime.messages.Messages.Disconnect
-import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
-import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
+import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask
+import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
@@ -35,11 +38,15 @@ import scala.language.postfixOps
/**
* Subclass of the [[TaskManager]] to support testing messages
*/
-class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
+class TestingTaskManager(config: TaskManagerConfiguration,
+ connectionInfo: InstanceConnectionInfo,
jobManagerAkkaURL: String,
- taskManagerConfig: TaskManagerConfiguration,
- networkConfig: NetworkEnvironmentConfiguration)
- extends TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConfig) {
+ memoryManager: DefaultMemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int)
+ extends TaskManager(config, connectionInfo, jobManagerAkkaURL,
+ memoryManager, ioManager, network, numberOfSlots) {
val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
@@ -81,12 +88,12 @@ class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
bcVarManager.getNumberOfVariablesWithReferences)
case RequestNumActiveConnections =>
- networkEnvironment match {
- case Some(ne) => sender ! ResponseNumActiveConnections(
- ne.getConnectionManager.getNumberOfActiveConnections)
-
- case None => sender ! ResponseNumActiveConnections(0)
- }
+ val numActive = if (network.isAssociated) {
+ network.getConnectionManager.getNumberOfActiveConnections
+ } else {
+ 0
+ }
+ sender ! ResponseNumActiveConnections(numActive)
case NotifyWhenJobRemoved(jobID) =>
if(runningTasks.values.exists(_.getJobID == jobID)){
@@ -129,7 +136,7 @@ class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
if (!disconnectDisabled) {
super.receiveWithLogMessages(msg)
- val jobManager = sender
+ val jobManager = sender()
waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
_ foreach {
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 9bb3d0b..63dce31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.testingUtils
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.CallingThreadDispatcher
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.taskmanager.TaskManager
import scala.concurrent.duration._
@@ -57,34 +56,18 @@ object TestingUtils {
def getDefaultTestingActorSystemConfig = testConfig
- def startTestingJobManager(system: ActorSystem): ActorRef = {
- val config = new Configuration()
-
- val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ ,
- executionRetries, delayBetweenRetries,
- timeout, archiveCount) = JobManager.createJobManagerComponents(config)
-
- val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
- val archive = system.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
- val jobManagerProps = Props(new JobManager(config, instanceManager, scheduler,
- libraryCacheManager, archive, accumulatorManager, None, executionRetries,
- delayBetweenRetries, timeout) with TestingJobManager)
-
- system.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
- }
def startTestingTaskManagerWithConfiguration(hostname: String,
jobManagerURL: String,
config: Configuration,
- system: ActorSystem) = {
+ system: ActorSystem) : ActorRef = {
- val (tmConfig, netConfig, connectionInfo, _) =
- TaskManager.parseTaskManagerConfiguration(config, hostname, true, false)
- val tmProps = Props(classOf[TestingTaskManager], connectionInfo,
- jobManagerURL, tmConfig, netConfig)
- system.actorOf(tmProps)
+ TaskManager.startTaskManagerComponentsAndActor(config, system,
+ hostname,
+ None, // random actor name
+ Some(jobManagerURL), // job manager
+ true, classOf[TestingTaskManager])
}
def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
@@ -92,11 +75,7 @@ object TestingUtils {
val jmURL = jobManager.path.toString
val config = new Configuration()
- val (tmConfig, netConfig, connectionInfo, _) =
- TaskManager.parseTaskManagerConfiguration(config, "localhost", true, true)
-
- val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig)
- system.actorOf(tmProps)
+ startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
}
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 2641bc1..5b80531 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -26,8 +26,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.jobmanager.BarrierAck;
-import org.apache.flink.runtime.jobmanager.StateBarrierAck;
+import org.apache.flink.runtime.messages.CheckpointingMessages;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateHandle;
@@ -116,12 +115,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
if (configuration.getStateMonitoring() && !states.isEmpty()) {
getEnvironment().getJobManager().tell(
- new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+ new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
new LocalStateHandle(states)), ActorRef.noSender());
} else {
getEnvironment().getJobManager().tell(
- new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+ new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 6df8099..ddfffee 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -110,8 +110,14 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
val localExecution = numTaskManagers == 1
- TaskManager.startTaskManagerActor(config, system, HOSTNAME,
- TaskManager.TASK_MANAGER_NAME + index, singleActorSystem, localExecution,
+ val jobManagerAkkaUrl: Option[String] = if (singleActorSystem) {
+ Some(jobManagerActor.path.toString)
+ } else {
+ None
+ }
+
+ TaskManager.startTaskManagerComponentsAndActor(config, system, HOSTNAME,
+ Some(TaskManager.TASK_MANAGER_NAME + index), jobManagerAkkaUrl, localExecution,
classOf[TestingTaskManager])
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 4311e9c..2901bf8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -139,7 +139,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
// we wait for the JobManager to have the two TaskManagers available
// wait for at most 20 seconds
- waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000);
+ waitUntilNumTaskManagersAreRegistered(jmActor, 2, 30000);
// the program will set a marker file in each of its parallel tasks once they are ready, so that
// this coordinating code is aware of this.
@@ -174,7 +174,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
// we wait for the third TaskManager to register (20 seconds max)
- waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000);
+ waitUntilNumTaskManagersAreRegistered(jmActor, 3, 30000);
// kill one of the previous TaskManagers, triggering a failure and recovery
taskManagerProcess1.destroy();
@@ -369,7 +369,7 @@ public abstract class AbstractProcessFailureRecoveryTest {
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- TaskManager.runTaskManager(cfg, TaskManager.class);
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
// wait forever
Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 6fa7a61..3556ec1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -87,7 +87,7 @@ public class YarnTaskManagerRunner {
@Override
public Object run() {
try {
- TaskManager.runTaskManager(configuration, YarnTaskManager.class);
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, YarnTaskManager.class);
}
catch (Throwable t) {
LOG.error("Error while starting the TaskManager", t);