You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/05/18 21:51:10 UTC
reef git commit: [REEF-1321] Adding TaskManager for IMRU fault
tolerant
Repository: reef
Updated Branches:
refs/heads/master 865cdb516 -> 326eae21a
[REEF-1321] Adding TaskManager for IMRU fault tolerant
This change:
* adds TaskManager class
* adds test cases for it
JIRA:
[REEF-1321](https://issues.apache.org/jira/browse/REEF-1321)
Pull request:
This closes #1002
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/326eae21
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/326eae21
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/326eae21
Branch: refs/heads/master
Commit: 326eae21a217073cdd6eedba6086f1e7ed93659d
Parents: 865cdb5
Author: Julia Wang <ju...@apache.org>
Authored: Thu May 12 14:32:40 2016 -0700
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Wed May 18 14:50:07 2016 -0700
----------------------------------------------------------------------
.../Org.Apache.REEF.IMRU.Tests.csproj | 1 +
.../TestTaskManager.cs | 679 +++++++++++++++++++
.../OnREEF/Driver/TaskInfo.cs | 58 ++
.../OnREEF/Driver/TaskManager.cs | 466 +++++++++++++
.../Org.Apache.REEF.IMRU.csproj | 2 +
5 files changed, 1206 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 415a6db..43e8dfe 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -54,6 +54,7 @@ under the License.
<Compile Include="TestActiveContextManager.cs" />
<Compile Include="TestEvaluatorManager.cs" />
<Compile Include="TestSystemStates.cs" />
+ <Compile Include="TestTaskManager.cs" />
<Compile Include="TestTaskStates.cs" />
</ItemGroup>
<ItemGroup>
http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
new file mode 100644
index 0000000..319f541
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -0,0 +1,679 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on anAssert.Equal
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using NSubstitute;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities;
+using Xunit;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+ /// <summary>
+ /// Test cases for TaskManager
+ /// </summary>
+ public sealed class TestTaskManager
+ {
+ private const string MapperTaskIdPrefix = "MapperTaskIdPrefix";
+ private const string MasterTaskId = "MasterTaskId";
+ private const string EvaluatorIdPrefix = "EvaluatorId";
+ private const string ContextIdPrefix = "ContextId";
+ private const int TotalNumberOfTasks = 3;
+
+ /// <summary>
+ /// Tests valid Add task cases
+ /// </summary>
+ [Fact]
+ public void TestValidAddAndReset()
+ {
+ var taskManager = TaskManagerWithTasksAdded();
+ Assert.True(taskManager.AreAllTasksInState(TaskState.TaskNew));
+ Assert.Equal(TotalNumberOfTasks, taskManager.NumberOfTasks);
+ taskManager.Reset();
+ Assert.Equal(0, taskManager.NumberOfTasks);
+ Assert.Equal(0, taskManager.NumberOfAppErrors());
+ }
+
+ /// <summary>
+ /// Tests SubmitTasks after adding all the tasks to the TaskManager
+ /// </summary>
+ [Fact]
+ public void TestSubmitTasks()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+ Assert.True(taskManager.AreAllTasksInState(TaskState.TaskSubmitted));
+ }
+
+ /// <summary>
+ /// Tests SubmitTask with a missing mapper task
+ /// </summary>
+ [Fact]
+ public void TestMissingMapperTasksSubmit()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+ taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+
+ Action submit = () => taskManager.SubmitTasks();
+ Assert.Throws<IMRUSystemException>(submit);
+ }
+
+ /// <summary>
+ /// Tests SubmitTask with missing master task
+ /// </summary>
+ [Fact]
+ public void TestMissingMasterTaskSubmit()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+ taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+
+ Action submit = () => taskManager.SubmitTasks();
+ Assert.Throws<IMRUSystemException>(submit);
+ }
+
+ /// <summary>
+ /// Tests adding all mapper tasks without master task
+ /// </summary>
+ [Fact]
+ public void NoMasterTask()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+ taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+ Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 3, MockConfig(), CreateMockActiveContext(3));
+ Assert.Throws<IMRUSystemException>(add);
+ }
+
+ /// <summary>
+ /// Tests adding more than expected tasks
+ /// </summary>
+ [Fact]
+ public void ExceededTotalNumber()
+ {
+ var taskManager = TaskManagerWithTasksAdded();
+ Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 4, MockConfig(), CreateMockActiveContext(4));
+ Assert.Throws<IMRUSystemException>(add);
+ }
+
+ /// <summary>
+ /// Tests adding a task with duplicated task id and duplicated master id
+ /// </summary>
+ [Fact]
+ public void DuplicatedTaskIdInAdd()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+ taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+ Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+ Assert.Throws<IMRUSystemException>(add);
+ add = () => taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(1));
+ Assert.Throws<IMRUSystemException>(add);
+ }
+
+ /// <summary>
+ /// Tests invalid arguments when adding tasks
+ /// </summary>
+ [Fact]
+ public void NullArguments()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+
+ Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, null, CreateMockActiveContext(1));
+ Assert.Throws<IMRUSystemException>(add);
+
+ add = () => taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), null);
+ Assert.Throws<IMRUSystemException>(add);
+ }
+
+ /// <summary>
+ /// Tests passing invalid arguments in creating TaskManager
+ /// </summary>
+ [Fact]
+ public void InvalidArgumentsInCreatingTaskManger()
+ {
+ Action taskManager = () => CreateTaskManager(0, MasterTaskId);
+ Assert.Throws<IMRUSystemException>(taskManager);
+
+ taskManager = () => CreateTaskManager(1, null);
+ Assert.Throws<IMRUSystemException>(taskManager);
+ }
+
+ /// <summary>
+ /// Tests whether all tasks rightly reach Running and Completed states
+ /// </summary>
+ [Fact]
+ public void TestCompletingTasks()
+ {
+ var taskManager = TaskManagerWithTasksRunning();
+ Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning));
+
+ taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+ taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+ taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+ Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted));
+ }
+
+ /// <summary>
+ /// Tests closing running tasks
+ /// </summary>
+ [Fact]
+ public void TestClosingRunningTasks()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+ var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+ taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver);
+
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+ Assert.True(taskManager.AreAllTasksInState(TaskState.TaskWaitingForClose));
+ }
+
+ /// <summary>
+ /// Tests record failed tasks after all the tasks are running
+ /// </summary>
+ [Fact]
+ public void TestFailedRunningTasks()
+ {
+ var taskManager = TaskManagerWithTasksRunning();
+
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError));
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError));
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests number of application errors
+ /// </summary>
+ [Fact]
+ public void TestAppError()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError));
+ Assert.Equal(1, taskManager.NumberOfAppErrors());
+ }
+
+ /// <summary>
+ /// Tests failed tasks in various event sequences
+ /// </summary>
+ [Fact]
+ public void TestFailedTasks()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
+
+ // This task failed by evaluator then failed by itself
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // no state change should happen in this case
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // This task failed by itself first, then failed by Evaluator failure
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
+ Assert.Equal(TaskState.TaskFailedByGroupCommunication, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // close the running task during shutting down
+ var masterRuningTask = CreateMockRunningTask(MasterTaskId);
+ taskManager.RecordRunningTaskDuringSystemFailure(masterRuningTask, TaskManager.CloseTaskByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver));
+ Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests after all the tasks are running, a task fails first, then close all running tasks
+ /// </summary>
+ [Fact]
+ public void TestFailedTasksAfterAllTasksAreRunnigScenario()
+ {
+ var taskManager = TaskManagerWithTasksRunning();
+
+ // A task fail first
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+ Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // system is in shutting down, close all other tasks
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // task 2 is killed by driver
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+ // master task is killed by driver
+ var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests after all the tasks are running, an evaluator fails first, then a task fails with communication error
+ /// </summary>
+ [Fact]
+ public void TestFailedEvaluatorThenFailedTaskAfterTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksRunning();
+
+ // Evaluator error
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // system is in shutting down, close all other tasks
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+ // Another task may get failed by communication during the shutting down
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+ Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // The task that receives the close from driver now send failed event back to driver
+ var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+ Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests after all the tasks are running, a task fails first, then an evaluator fails
+ /// </summary>
+ [Fact]
+ public void TestFailedTasksThenFailedEvaluatorAfterAllTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksRunning();
+
+ // A task fails first
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+ Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // system is in shutting down, close all other tasks
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // An Evaluator fails during shut down, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+ Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // master task gets communication error before it receives close event, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
+ var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+ Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Test evaluator fails before any task is running after all the tasks are submitted
+ /// </summary>
+ [Fact]
+ public void TestFailedEvaluatorBeforeAnyTaskIsRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ // Evaluator error
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // there is no any running task yet
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // task2 is running , close it
+ var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+ taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // master task is running, close it
+ var masterTask = CreateMockRunningTask(MasterTaskId);
+ taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+ // received task failure because of the closing
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+ // received task failure because of the closing
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests an evaluator fails for a running task before all the tasks are running
+ /// </summary>
+ [Fact]
+ public void TestFailedEvaluatorOnRunningTaskBeforeAllTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+ // Evaluator error
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // the master task should be closed
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // task 2 is now running, close it
+ var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+ taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests an evaluator fails for a non running task before all the tasks are running
+ /// </summary>
+ [Fact]
+ public void TestFailedEvaluatorOnNoRunningTaskBeforeAllTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+ // Evaluator error
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+ taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+ Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // Send event to close master task and task1
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests a task fails before any task is running after all the tasks are submitted.
+ /// </summary>
+ [Fact]
+ public void TestFailedTaskBeforeAnyTaskIsRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ // Evaluator error
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+ Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // there is no any running task yet
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // task 2 is running, now close it
+ var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+ taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // master task is running, close it
+ var masterTask = CreateMockRunningTask(MasterTaskId);
+ taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+ // The task 2 could be failed by communication before receiving close event
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+ // master task failed because receiving close event
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests a running task fails before all the tasks are running
+ /// </summary>
+ [Fact]
+ public void TestFailedRunningTaskBeforeAllTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+ // Evaluator error
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+ Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+ // there is no any running task yet
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // task 2 is running, now close it
+ var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+ taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // master task is running, close it
+ var masterTask = CreateMockRunningTask(MasterTaskId);
+ taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+ // The task 2 could be failed by communication before receiving close event
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+ // master task failed because receiving close event
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Tests a non running task fails before all the tasks are running
+ /// </summary>
+ [Fact]
+ public void TestFailedNoRunningTaskBeforeAllTasksAreRunningScenario()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+ // Evaluator error
+ var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+ taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
+ Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+ // there is no any running task yet
+ taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // master task is running, close it
+ var masterTask = CreateMockRunningTask(MasterTaskId);
+ taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+ Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+ // The task 1 could be failed by communication before receiving close event
+ var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+
+ // master task failed could be failed by communication error as well
+ var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
+ taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+ Assert.True(taskManager.AllInFinalState());
+ }
+
+ /// <summary>
+ /// Creates a TaskManager with specified numTasks, masterTaskId and IGroupCommDriver
+ /// </summary>
+ /// <param name="numTasks"></param>
+ /// <param name="masterTaskId"></param>
+ /// <returns></returns>
+ private static TaskManager CreateTaskManager(int numTasks = TotalNumberOfTasks, string masterTaskId = MasterTaskId)
+ {
+ var taskManager = new TaskManager(numTasks, masterTaskId);
+ return taskManager;
+ }
+
+ /// <summary>
+ /// Creates a TaskManager and add one master task and two mapping tasks
+ /// </summary>
+ /// <returns></returns>
+ private static TaskManager TaskManagerWithTasksAdded()
+ {
+ var taskManager = CreateTaskManager();
+ taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+ taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+ taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+
+ return taskManager;
+ }
+
+ /// <summary>
+ /// Create a TaskManager with all the tasks submitted
+ /// </summary>
+ /// <returns></returns>
+ private static TaskManager TaskManagerWithTasksSubmitted()
+ {
+ var taskManager = TaskManagerWithTasksAdded();
+ taskManager.SubmitTasks();
+
+ return taskManager;
+ }
+
+ /// <summary>
+ /// Create a TaskManager with all the tasks running
+ /// </summary>
+ /// <returns></returns>
+ private static TaskManager TaskManagerWithTasksRunning()
+ {
+ var taskManager = TaskManagerWithTasksSubmitted();
+ taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+ taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
+
+ return taskManager;
+ }
+
+ /// <summary>
+ /// Creates a mock IActiveContext
+ /// </summary>
+ /// <param name="id"></param>
+ /// <returns></returns>
+ private static IActiveContext CreateMockActiveContext(int id)
+ {
+ var mockActiveContext = Substitute.For<IActiveContext>();
+ mockActiveContext.Id.Returns(ContextIdPrefix + id);
+ mockActiveContext.EvaluatorId.Returns(EvaluatorIdPrefix + ContextIdPrefix + id);
+ return mockActiveContext;
+ }
+
+ /// <summary>
+ /// Creates a mock FailedTask with specified taskId and error message
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <param name="errorMsg"></param>
+ /// <returns></returns>
+ private static IFailedTask CreateMockFailedTask(string taskId, string errorMsg)
+ {
+ IFailedTask failedtask = Substitute.For<IFailedTask>();
+ failedtask.Id.Returns(taskId);
+ failedtask.Message.Returns(errorMsg);
+ return failedtask;
+ }
+
+ /// <summary>
+ /// Creates a mock running task with the taskId specified
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <returns></returns>
+ private static IRunningTask CreateMockRunningTask(string taskId)
+ {
+ var runningTask = Substitute.For<IRunningTask>();
+ runningTask.Id.Returns(taskId);
+ return runningTask;
+ }
+
+ /// <summary>
+ /// Creates a mock running task with the taskId specified
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <returns></returns>
+ private static ICompletedTask CreateMockCompletedTask(string taskId)
+ {
+ var completedTask = Substitute.For<ICompletedTask>();
+ completedTask.Id.Returns(taskId);
+ return completedTask;
+ }
+
+ /// <summary>
+ /// Creates a mock IFailedEvaluator with the specified IFailedTask associated
+ /// </summary>
+ /// <param name="evaluatorId"></param>
+ /// <param name="failedTask"></param>
+ /// <returns></returns>
+ private static IFailedEvaluator CreateMockFailedEvaluator(string evaluatorId, IFailedTask failedTask)
+ {
+ var failedEvalutor = Substitute.For<IFailedEvaluator>();
+ failedEvalutor.Id.Returns(evaluatorId);
+ failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Of(failedTask));
+ return failedEvalutor;
+ }
+
+ /// <summary>
+ /// Creates a mock IConfiguration
+ /// </summary>
+ /// <returns></returns>
+ private static IConfiguration MockConfig()
+ {
+ return TangFactory.GetTang().NewConfigurationBuilder().Build();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
new file mode 100644
index 0000000..6ae992d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
@@ -0,0 +1,58 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ internal sealed class TaskInfo
+ {
+ private readonly TaskStateMachine _taskState;
+ private readonly IConfiguration _taskConfiguration;
+ private readonly IActiveContext _activeContext;
+
+ /// <summary>
+ /// Construct a TaskInfo that wraps task state, task configuration, and active context for submitting the task
+ /// </summary>
+ /// <param name="taskState"></param>
+ /// <param name="config"></param>
+ /// <param name="context"></param>
+ internal TaskInfo(TaskStateMachine taskState, IConfiguration config, IActiveContext context)
+ {
+ _taskState = taskState;
+ _taskConfiguration = config;
+ _activeContext = context;
+ }
+
+ internal TaskStateMachine TaskState
+ {
+ get { return _taskState; }
+ }
+
+ internal IConfiguration TaskConfiguration
+ {
+ get { return _taskConfiguration; }
+ }
+
+ internal IActiveContext ActiveContext
+ {
+ get { return _activeContext; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
new file mode 100644
index 0000000..78af207
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -0,0 +1,466 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ /// <summary>
+ /// Manages Tasks, maintains task states and responsible for task submission
+ /// </summary>
+ [NotThreadSafe]
+ internal sealed class TaskManager
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(TaskManager));
+
+ /// <summary>
+ /// Error messages thrown in IMRU tasks when an exception happens
+ /// </summary>
+ internal const string TaskAppError = "TaskAppError";
+ internal const string TaskSystemError = "TaskSystemError";
+ internal const string TaskGroupCommunicationError = "TaskGroupCommunicationError";
+ internal const string TaskEvaluatorError = "TaskEvaluatorError";
+
+ /// <summary>
+ /// Message sending from driver to evaluator to close a running task
+ /// </summary>
+ internal const string CloseTaskByDriver = "CloseTaskByDriver";
+
+ /// <summary>
+ /// Error message in Task exception to show the task received close event
+ /// </summary>
+ internal const string TaskKilledByDriver = "TaskKilledByDriver";
+
+ /// <summary>
+ /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains
+ /// task state, task configuration, and active context that the task is running on.
+ /// </summary>
+ private readonly IDictionary<string, TaskInfo> _tasks = new Dictionary<string, TaskInfo>();
+
+ /// <summary>
+ /// This Dictionary keeps all the running tasks. The key is the Task Id and the value is IRunningTask.
+ /// After a task is running, it will be added to this collection. After the task is requested to close,
+ /// or fails, completed, it will be removed from this collection.
+ /// </summary>
+ private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>();
+
+ /// <summary>
+ /// Total expected tasks
+ /// </summary>
+ private readonly int _totalExpectedTasks;
+
+ /// <summary>
+ /// Master tasks Id is set in the IGroupCommDriver. It must be the same Id used in the TaskManager.
+ /// </summary>
+ private readonly string _masterTaskId;
+
+ /// <summary>
+ /// Total number of Application error received from tasks
+ /// </summary>
+ private int _numberOfAppErrors = 0;
+
+ /// <summary>
+ /// Creates a TaskManager with specified total number of tasks and master task id.
+ /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null.
+ /// </summary>
+ /// <param name="numTasks"></param>
+ /// <param name="masterTaskId"></param>
+ internal TaskManager(int numTasks, string masterTaskId)
+ {
+ if (numTasks <= 0)
+ {
+ Exceptions.Throw(new IMRUSystemException("Number of expected tasks must be positive"), Logger);
+ }
+
+ if (string.IsNullOrWhiteSpace(masterTaskId))
+ {
+ Exceptions.Throw(new IMRUSystemException("masterTaskId cannot be null"), Logger);
+ }
+
+ _totalExpectedTasks = numTasks;
+ _masterTaskId = masterTaskId;
+ }
+
+ /// <summary>
+ /// Adds a Task to the task collection
+ /// Throws IMRUSystemException in the following cases:
+ /// taskId is already added
+ /// taskConfiguration is null
+ /// activeContext is null
+ /// trying to add extra tasks
+ /// No Master Task is added in the collection
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <param name="taskConfiguration"></param>
+ /// <param name="activeContext"></param>
+ internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext)
+ {
+ if (taskId == null)
+ {
+ Exceptions.Throw(new IMRUSystemException("The taskId is null."), Logger);
+ }
+
+ if (_tasks.ContainsKey(taskId))
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already exists.", taskId);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ if (taskConfiguration == null)
+ {
+ Exceptions.Throw(new IMRUSystemException("The task configuration is null."), Logger);
+ }
+
+ if (activeContext == null)
+ {
+ Exceptions.Throw(new IMRUSystemException("The context is null."), Logger);
+ }
+
+ if (NumberOfTasks >= _totalExpectedTasks)
+ {
+ string msg = string.Format("Trying to add an additional Task {0}, but the total expected Task number {1} has been reached.", taskId, _totalExpectedTasks);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ _tasks.Add(taskId, new TaskInfo(new TaskStateMachine(), taskConfiguration, activeContext));
+
+ if (NumberOfTasks == _totalExpectedTasks && !MasterTaskExists())
+ {
+ Exceptions.Throw(new IMRUSystemException("There is no master task added."), Logger);
+ }
+ }
+
+ /// <summary>
+ /// Returns the number of tasks in the task collection
+ /// </summary>
+ internal int NumberOfTasks
+ {
+ get { return _tasks.Count; }
+ }
+
+ /// <summary>
+ /// This method is called when receiving IRunningTask event during task submitting.
+ /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning.
+ /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task.
+ /// </summary>
+ /// <param name="runningTask"></param>
+ internal void RecordRunningTask(IRunningTask runningTask)
+ {
+ if (_runningTasks.ContainsKey(runningTask.Id))
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already running.", runningTask.Id);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ if (!_tasks.ContainsKey(runningTask.Id))
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist.", runningTask.Id);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ _runningTasks.Add(runningTask.Id, runningTask);
+ UpdateState(runningTask.Id, TaskStateEvent.RunningTask);
+ }
+
+ /// <summary>
+ /// This method is called at the beginning of the recovery.
+ /// Clears the task collection, running task collection and resets the number of application error.
+ /// </summary>
+ internal void Reset()
+ {
+ _tasks.Clear();
+ _runningTasks.Clear();
+ _numberOfAppErrors = 0;
+ }
+
+ /// <summary>
+ /// This method is called when receiving ICompletedTask event during task running or system shutting down.
+ /// Removes the task from running tasks
+ /// Changes the task state from RunningTask to CompletedTask
+ /// </summary>
+ /// <param name="completedTask"></param>
+ internal void RecordCompletedTask(ICompletedTask completedTask)
+ {
+ RemoveRunningTask(completedTask.Id);
+ UpdateState(completedTask.Id, TaskStateEvent.CompletedTask);
+ }
+
+ /// <summary>
+ /// This method is called when receiving IFailedTask event during task submitting or running
+ /// Removes the task from running tasks if the task was running
+ /// Updates the task state to fail based on the error message in the failed task
+ /// </summary>
+ /// <param name="failedTask"></param>
+ internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask)
+ {
+ //// Remove the task from running tasks if it exists there
+ _runningTasks.Remove(failedTask.Id);
+ UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+ }
+
+ /// <summary>
+ /// This method is called when receiving IFailedTask event during system shutting down.
+ /// If the task failed because it receives the close command from driver, update the task state to TaskClosedByDriver.
+ /// Task could fail by communication error or any other application or system error during this time, as long as it is not
+ /// TaskFailedByEvaluatorFailure, update the task state based on the error received.
+ /// </summary>
+ /// <param name="failedTask"></param>
+ internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
+ {
+ var taskState = GetTaskState(failedTask.Id);
+ if (taskState == StateMachine.TaskState.TaskWaitingForClose)
+ {
+ UpdateState(failedTask.Id, TaskStateEvent.ClosedTask);
+ }
+ else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure)
+ {
+ UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+ }
+ }
+
+ /// <summary>
+ /// This method is called when receiving an IFailedEvaluator event during TaskSubmitted, TaskRunning or system shutting down.
+ /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running.
+ /// Sets the task state to TaskFailedByEvaluatorFailure
+ /// </summary>
+ /// <param name="failedEvaluator"></param>
+ internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator)
+ {
+ if (failedEvaluator.FailedTask.IsPresent())
+ {
+ var taskId = failedEvaluator.FailedTask.Value.Id;
+ var taskState = GetTaskState(taskId);
+ if (taskState == StateMachine.TaskState.TaskRunning)
+ {
+ RemoveRunningTask(taskId);
+ }
+
+ UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+ }
+ }
+
+ /// <summary>
+ /// Removes a task from running tasks if it exists in the running tasks collection
+ /// </summary>
+ /// <param name="taskId"></param>
+ private void RemoveRunningTask(string taskId)
+ {
+ if (!_runningTasks.ContainsKey(taskId))
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+ _runningTasks.Remove(taskId);
+ }
+
+ /// <summary>
+ /// Updates task state for a given taskId based on the task event
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <param name="taskEvent"></param>
+ private void UpdateState(string taskId, TaskStateEvent taskEvent)
+ {
+ GetTaskInfo(taskId).TaskState.MoveNext(taskEvent);
+ }
+
+ /// <summary>
+ /// Checks if all the tasks are running.
+ /// </summary>
+ /// <returns></returns>
+ internal bool AreAllTasksRunning()
+ {
+ return AreAllTasksInState(StateMachine.TaskState.TaskRunning) &&
+ _runningTasks.Count == _totalExpectedTasks;
+ }
+
+ /// <summary>
+ /// Checks if all the tasks are completed.
+ /// </summary>
+ /// <returns></returns>
+ internal bool AreAllTasksCompleted()
+ {
+ return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0;
+ }
+
+ /// <summary>
+ /// This method is called when receiving either IFailedEvaluator or IFailedTask event
+ /// Driver tries to close all the running tasks and clean the running task collection in the end.
+ /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1
+ /// If this happens before all the tasks are running, then the total number of running tasks should smaller than _totalExpectedTasks -1
+ /// If this happens when no task is running, the total number of running tasks could be 0
+ /// </summary>
+ internal void CloseAllRunningTasks(string closeMessage)
+ {
+ Logger.Log(Level.Verbose, "Closing [{0}] running tasks.", _runningTasks.Count);
+ foreach (var runningTask in _runningTasks.Values)
+ {
+ runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage));
+ UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose);
+ }
+ _runningTasks.Clear();
+ }
+
+ /// <summary>
+ /// This method is called when receiving an IRunningTask event but system is either in shutting down or fail.
+ /// In this case, the task should not be added in Running Tasks yet.
+ /// Change the task state to TaskRunning if it is still in TaskSubmitted state
+ /// Closes the IRunningTask
+ /// Then move the task state to WaitingTaskToClose
+ /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection
+ /// </summary>
+ /// <param name="runningTask"></param>
+ /// <param name="closeMessage"></param>
+ internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage)
+ {
+ if (runningTask == null)
+ {
+ Exceptions.Throw(new IMRUSystemException("RunningTask is null."), Logger);
+ }
+
+ if (_runningTasks.ContainsKey(runningTask.Id))
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] is already in running tasks.", runningTask.Id);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ UpdateState(runningTask.Id, TaskStateEvent.RunningTask);
+ runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage));
+ UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose);
+ }
+
+ /// <summary>
+ /// Gets error type based on the information in IFailedTask
+ /// Currently we use the Message in IFailedTask to distinguish different types of errors
+ /// </summary>
+ /// <param name="failedTask"></param>
+ /// <returns></returns>
+ private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask)
+ {
+ switch (failedTask.Message)
+ {
+ case TaskAppError:
+ _numberOfAppErrors++;
+ return TaskStateEvent.FailedTaskAppError;
+ case TaskSystemError:
+ return TaskStateEvent.FailedTaskSystemError;
+ case TaskGroupCommunicationError:
+ return TaskStateEvent.FailedTaskCommunicationError;
+ default:
+ return TaskStateEvent.FailedTaskSystemError;
+ }
+ }
+
+ /// <summary>
+ /// Returns the number of application error caused by FailedTask
+ /// </summary>
+ /// <returns></returns>
+ internal int NumberOfAppErrors()
+ {
+ return _numberOfAppErrors;
+ }
+
+ /// <summary>
+ /// Checks if all the tasks are in final states
+ /// </summary>
+ /// <returns></returns>
+ internal bool AllInFinalState()
+ {
+ return _tasks.All(t => t.Value.TaskState.IsFinalState());
+ }
+
+ /// <summary>
+ /// Gets current state of the task
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <returns></returns>
+ internal TaskState GetTaskState(string taskId)
+ {
+ var taskInfo = GetTaskInfo(taskId);
+ return taskInfo.TaskState.CurrentState;
+ }
+
+ /// <summary>
+ /// Checks if all the tasks are in the state specified.
+ /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state
+ /// </summary>
+ /// <param name="taskState"></param>
+ /// <returns></returns>
+ internal bool AreAllTasksInState(TaskState taskState)
+ {
+ return _tasks.All(t => t.Value.TaskState.CurrentState == taskState);
+ }
+
+ /// <summary>
+ /// Submit all the tasks
+ /// Tasks will be submitted after all the tasks are added in the collection and master task exists
+ /// IMRUSystemException will be thrown if not all the tasks are added or if there is no master task
+ /// </summary>
+ internal void SubmitTasks()
+ {
+ if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+ {
+ string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+
+ foreach (var taskId in _tasks.Keys)
+ {
+ var taskInfo = GetTaskInfo(taskId);
+ taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
+ UpdateState(taskId, TaskStateEvent.SubmittedTask);
+ }
+ }
+
+ /// <summary>
+ /// Checks if master task has been added
+ /// </summary>
+ /// <returns></returns>
+ private bool MasterTaskExists()
+ {
+ return _tasks.ContainsKey(_masterTaskId);
+ }
+
+ /// <summary>
+ /// Gets task Tuple based on the given taskId.
+ /// Throws IMRUSystemException if the task Tuple is not in the task collection.
+ /// </summary>
+ /// <param name="taskId"></param>
+ /// <returns></returns>
+ private TaskInfo GetTaskInfo(string taskId)
+ {
+ TaskInfo taskInfo;
+ _tasks.TryGetValue(taskId, out taskInfo);
+ if (taskInfo == null)
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] does not exist in the task collection.", taskId);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
+ return taskInfo;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 2f5cf03..b51638a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -88,6 +88,8 @@ under the License.
<Compile Include="OnREEF\Driver\StateMachine\TaskStateEvent.cs" />
<Compile Include="OnREEF\Driver\StateMachine\TaskState.cs" />
<Compile Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" />
+ <Compile Include="OnREEF\Driver\TaskInfo.cs" />
+ <Compile Include="OnREEF\Driver\TaskManager.cs" />
<Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
<Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
<Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" />