You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/05/18 21:51:10 UTC

reef git commit: [REEF-1321] Adding TaskManager for IMRU fault tolerant

Repository: reef
Updated Branches:
  refs/heads/master 865cdb516 -> 326eae21a


[REEF-1321] Adding TaskManager for IMRU fault tolerant

This change:
 * adds TaskManager class
 * adds test cases for it

JIRA:
  [REEF-1321](https://issues.apache.org/jira/browse/REEF-1321)

Pull request:
  This closes #1002


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/326eae21
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/326eae21
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/326eae21

Branch: refs/heads/master
Commit: 326eae21a217073cdd6eedba6086f1e7ed93659d
Parents: 865cdb5
Author: Julia Wang <ju...@apache.org>
Authored: Thu May 12 14:32:40 2016 -0700
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Wed May 18 14:50:07 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.IMRU.Tests.csproj           |   1 +
 .../TestTaskManager.cs                          | 679 +++++++++++++++++++
 .../OnREEF/Driver/TaskInfo.cs                   |  58 ++
 .../OnREEF/Driver/TaskManager.cs                | 466 +++++++++++++
 .../Org.Apache.REEF.IMRU.csproj                 |   2 +
 5 files changed, 1206 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 415a6db..43e8dfe 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -54,6 +54,7 @@ under the License.
     <Compile Include="TestActiveContextManager.cs" />
     <Compile Include="TestEvaluatorManager.cs" />
     <Compile Include="TestSystemStates.cs" />
+    <Compile Include="TestTaskManager.cs" />
     <Compile Include="TestTaskStates.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
new file mode 100644
index 0000000..319f541
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -0,0 +1,679 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on anAssert.Equal
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using NSubstitute;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities;
+using Xunit;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    /// <summary>
+    /// Test cases for TaskManager
+    /// </summary>
+    public sealed class TestTaskManager
+    {
+        private const string MapperTaskIdPrefix = "MapperTaskIdPrefix";
+        private const string MasterTaskId = "MasterTaskId";
+        private const string EvaluatorIdPrefix = "EvaluatorId";
+        private const string ContextIdPrefix = "ContextId";
+        private const int TotalNumberOfTasks = 3;
+
+        /// <summary>
+        /// Tests valid Add task cases
+        /// </summary>
+        [Fact]
+        public void TestValidAddAndReset()
+        {
+            var taskManager = TaskManagerWithTasksAdded();
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskNew));
+            Assert.Equal(TotalNumberOfTasks, taskManager.NumberOfTasks);
+            taskManager.Reset();
+            Assert.Equal(0, taskManager.NumberOfTasks);
+            Assert.Equal(0, taskManager.NumberOfAppErrors());
+        }
+
+        /// <summary>
+        /// Tests SubmitTasks after adding all the tasks to the TaskManager
+        /// </summary>
+        [Fact]
+        public void TestSubmitTasks()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskSubmitted));
+        }
+
+        /// <summary>
+        /// Tests SubmitTask with a missing mapper task
+        /// </summary>
+        [Fact]
+        public void TestMissingMapperTasksSubmit()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+            taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+
+            Action submit = () => taskManager.SubmitTasks();
+            Assert.Throws<IMRUSystemException>(submit);
+        }
+
+        /// <summary>
+        /// Tests SubmitTask with missing master task
+        /// </summary>
+        [Fact]
+        public void TestMissingMasterTaskSubmit()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+            taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+
+            Action submit = () => taskManager.SubmitTasks();
+            Assert.Throws<IMRUSystemException>(submit);
+        }
+
+        /// <summary>
+        /// Tests adding all mapper tasks without master task
+        /// </summary>
+        [Fact]
+        public void NoMasterTask()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+            taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+            Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 3, MockConfig(), CreateMockActiveContext(3));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Tests adding more than expected tasks
+        /// </summary>
+        [Fact]
+        public void ExceededTotalNumber()
+        {
+            var taskManager = TaskManagerWithTasksAdded();
+            Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 4, MockConfig(), CreateMockActiveContext(4));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Tests adding a task with duplicated task id and duplicated master id
+        /// </summary>
+        [Fact]
+        public void DuplicatedTaskIdInAdd()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+            taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+            Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+            Assert.Throws<IMRUSystemException>(add);
+            add = () => taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(1));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Tests invalid arguments when adding tasks
+        /// </summary>
+        [Fact]
+        public void NullArguments()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+
+            Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, null, CreateMockActiveContext(1));
+            Assert.Throws<IMRUSystemException>(add);
+
+            add = () => taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), null);
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Tests passing invalid arguments in creating TaskManager
+        /// </summary>
+        [Fact]
+        public void InvalidArgumentsInCreatingTaskManger()
+        {
+            Action taskManager = () => CreateTaskManager(0, MasterTaskId);
+            Assert.Throws<IMRUSystemException>(taskManager);
+
+            taskManager = () => CreateTaskManager(1, null);
+            Assert.Throws<IMRUSystemException>(taskManager);
+        }
+
+        /// <summary>
+        /// Tests whether all tasks rightly reach Running and Completed states
+        /// </summary>
+        [Fact]
+        public void TestCompletingTasks()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning));
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted));
+        }
+
+        /// <summary>
+        /// Tests closing running tasks
+        /// </summary>
+        [Fact]
+        public void TestClosingRunningTasks()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver);
+
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskWaitingForClose));
+        }
+
+        /// <summary>
+        /// Tests record failed tasks after all the tasks are running
+        /// </summary>
+        [Fact]
+        public void TestFailedRunningTasks()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError));
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError));
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests number of application errors 
+        /// </summary>
+        [Fact]
+        public void TestAppError()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError));
+            Assert.Equal(1, taskManager.NumberOfAppErrors());
+        }
+
+        /// <summary>
+        /// Tests failed tasks in various event sequences
+        /// </summary>
+        [Fact]
+        public void TestFailedTasks()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
+
+            // This task failed by evaluator then failed by itself
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // no state change should happen in this case
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // This task failed by itself first, then failed by Evaluator failure
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
+            Assert.Equal(TaskState.TaskFailedByGroupCommunication, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // close the running task during shutting down
+            var masterRuningTask = CreateMockRunningTask(MasterTaskId);
+            taskManager.RecordRunningTaskDuringSystemFailure(masterRuningTask, TaskManager.CloseTaskByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver));
+            Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests after all the tasks are running, a task fails first, then close all running tasks
+        /// </summary>
+        [Fact]
+        public void TestFailedTasksAfterAllTasksAreRunnigScenario()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+
+            // A task fail first
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+            Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // system is in shutting down, close all other tasks
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // task 2 is killed by driver
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+            // master task is killed by driver
+            var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests after all the tasks are running, an evaluator fails first, then a task fails with communication error
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorThenFailedTaskAfterTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+
+            // Evaluator error
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // system is in shutting down, close all other tasks
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+            // Another task may get failed by communication during the shutting down
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+            Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // The task that receives the close from driver now send failed event back to driver
+            var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+            Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests after all the tasks are running, a task fails first, then an evaluator fails
+        /// </summary>
+        [Fact]
+        public void TestFailedTasksThenFailedEvaluatorAfterAllTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+
+            // A task fails first
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+            Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // system is in shutting down, close all other tasks
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // An Evaluator fails during shut down, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+            Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // master task gets communication error before it receives close event, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver
+            var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
+            Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Test evaluator fails before any task is running after all the tasks are submitted
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorBeforeAnyTaskIsRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            // Evaluator error
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // there is no any running task yet
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // task2 is running , close it
+            var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // master task is running, close it
+            var masterTask = CreateMockRunningTask(MasterTaskId);
+            taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+            // received task failure because of the closing
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+            // received task failure because of the closing
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests an evaluator fails for a running task before all the tasks are running
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorOnRunningTaskBeforeAllTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            // Evaluator error
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // the master task should be closed
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // task 2 is now running, close it
+            var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests an evaluator fails for a non running task before all the tasks are running
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorOnNoRunningTaskBeforeAllTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            // Evaluator error
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // Send event to close master task and task1
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests a task fails before any task is running after all the tasks are submitted.
+        /// </summary>
+        [Fact]
+        public void TestFailedTaskBeforeAnyTaskIsRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            // Evaluator error
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+            Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // there is no any running task yet
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // task 2 is running, now close it
+            var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // master task is running, close it
+            var masterTask = CreateMockRunningTask(MasterTaskId);
+            taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+            // The task 2 could be failed by communication before receiving close event
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+            // master task failed because receiving close event
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests a running task fails before all the tasks are running
+        /// </summary>
+        [Fact]
+        public void TestFailedRunningTaskBeforeAllTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            // Evaluator error
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1);
+            Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
+
+            // there is no any running task yet
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // task 2 is running, now close it
+            var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // master task is running, close it
+            var masterTask = CreateMockRunningTask(MasterTaskId);
+            taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+            // The task 2 could be failed by communication before receiving close event
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2);
+
+            // master task failed because receiving close event
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Tests a non running task fails before all the tasks are running
+        /// </summary>
+        [Fact]
+        public void TestFailedNoRunningTaskBeforeAllTasksAreRunningScenario()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            // Evaluator error
+            var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError);
+            taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2);
+            Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 2));
+
+            // there is no any running task yet
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+            // master task is running, close it
+            var masterTask = CreateMockRunningTask(MasterTaskId);
+            taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver);
+            Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId));
+
+            // The task 1 could be failed by communication before receiving close event
+            var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1);
+
+            // master task failed could be failed by communication error as well
+            var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
+            taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
+
+            Assert.True(taskManager.AllInFinalState());
+        }
+
+        /// <summary>
+        /// Creates a TaskManager with specified numTasks, masterTaskId and IGroupCommDriver
+        /// </summary>
+        /// <param name="numTasks"></param>
+        /// <param name="masterTaskId"></param>
+        /// <returns></returns>
+        private static TaskManager CreateTaskManager(int numTasks = TotalNumberOfTasks, string masterTaskId = MasterTaskId)
+        {
+            var taskManager = new TaskManager(numTasks, masterTaskId);
+            return taskManager;
+        }
+
+        /// <summary>
+        /// Creates a TaskManager and add one master task and two mapping tasks
+        /// </summary>
+        /// <returns></returns>
+        private static TaskManager TaskManagerWithTasksAdded()
+        {
+            var taskManager = CreateTaskManager();
+            taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0));
+            taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1));
+            taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2));
+
+            return taskManager;
+        }
+
+        /// <summary>
+        /// Create a TaskManager with all the tasks submitted
+        /// </summary>
+        /// <returns></returns>
+        private static TaskManager TaskManagerWithTasksSubmitted()
+        {
+            var taskManager = TaskManagerWithTasksAdded();
+            taskManager.SubmitTasks();
+
+            return taskManager;
+        }
+
+        /// <summary>
+        /// Create a TaskManager with all the tasks running
+        /// </summary>
+        /// <returns></returns>
+        private static TaskManager TaskManagerWithTasksRunning()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2));
+
+            return taskManager;
+        }
+
+        /// <summary>
+        /// Creates a mock IActiveContext
+        /// </summary>
+        /// <param name="id"></param>
+        /// <returns></returns>
+        private static IActiveContext CreateMockActiveContext(int id)
+        {
+            var mockActiveContext = Substitute.For<IActiveContext>();
+            mockActiveContext.Id.Returns(ContextIdPrefix + id);
+            mockActiveContext.EvaluatorId.Returns(EvaluatorIdPrefix + ContextIdPrefix + id);
+            return mockActiveContext;
+        }
+
+        /// <summary>
+        /// Creates a mock FailedTask with specified taskId and error message
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <param name="errorMsg"></param>
+        /// <returns></returns>
+        private static IFailedTask CreateMockFailedTask(string taskId, string errorMsg)
+        {
+            IFailedTask failedtask = Substitute.For<IFailedTask>();
+            failedtask.Id.Returns(taskId);
+            failedtask.Message.Returns(errorMsg);
+            return failedtask;
+        }
+
+        /// <summary>
+        /// Creates a mock running task with the taskId specified
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <returns></returns>
+        private static IRunningTask CreateMockRunningTask(string taskId)
+        {
+            var runningTask = Substitute.For<IRunningTask>();
+            runningTask.Id.Returns(taskId);
+            return runningTask;
+        }
+
+        /// <summary>
+        /// Creates a mock running task with the taskId specified
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <returns></returns>
+        private static ICompletedTask CreateMockCompletedTask(string taskId)
+        {
+            var completedTask = Substitute.For<ICompletedTask>();
+            completedTask.Id.Returns(taskId);
+            return completedTask;
+        }
+
+        /// <summary>
+        /// Creates a mock IFailedEvaluator with the specified IFailedTask associated
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <param name="failedTask"></param>
+        /// <returns></returns>
+        private static IFailedEvaluator CreateMockFailedEvaluator(string evaluatorId, IFailedTask failedTask)
+        {
+            var failedEvalutor = Substitute.For<IFailedEvaluator>();
+            failedEvalutor.Id.Returns(evaluatorId);
+            failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Of(failedTask));
+            return failedEvalutor;
+        }
+
+        /// <summary>
+        /// Creates a mock IConfiguration
+        /// </summary>
+        /// <returns></returns>
+        private static IConfiguration MockConfig()
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder().Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
new file mode 100644
index 0000000..6ae992d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
@@ -0,0 +1,58 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    internal sealed class TaskInfo
+    {
+        private readonly TaskStateMachine _taskState;
+        private readonly IConfiguration _taskConfiguration;
+        private readonly IActiveContext _activeContext;
+
+        /// <summary>
+        /// Construct a TaskInfo that wraps task state, task configuration, and active context for submitting the task 
+        /// </summary>
+        /// <param name="taskState"></param>
+        /// <param name="config"></param>
+        /// <param name="context"></param>
+        internal TaskInfo(TaskStateMachine taskState, IConfiguration config, IActiveContext context)
+        {
+            _taskState = taskState;
+            _taskConfiguration = config;
+            _activeContext = context;
+        }
+
+        internal TaskStateMachine TaskState
+        {
+            get { return _taskState; }
+        }
+
+        internal IConfiguration TaskConfiguration
+        {
+            get { return _taskConfiguration; }
+        }
+
+        internal IActiveContext ActiveContext
+        {
+            get { return _activeContext; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
new file mode 100644
index 0000000..78af207
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -0,0 +1,466 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Manages Tasks, maintains task states and responsible for task submission
+    /// </summary>
+    [NotThreadSafe]
+    internal sealed class TaskManager
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(TaskManager));
+
+        /// <summary>
+        /// Error messages thrown in IMRU tasks when an exception happens
+        /// </summary>
+        internal const string TaskAppError = "TaskAppError";
+        internal const string TaskSystemError = "TaskSystemError";
+        internal const string TaskGroupCommunicationError = "TaskGroupCommunicationError";
+        internal const string TaskEvaluatorError = "TaskEvaluatorError";
+
+        /// <summary>
+        /// Message sending from driver to evaluator to close a running task
+        /// </summary>
+        internal const string CloseTaskByDriver = "CloseTaskByDriver";
+
+        /// <summary>
+        /// Error message in Task exception to show the task received close event
+        /// </summary>
+        internal const string TaskKilledByDriver = "TaskKilledByDriver";
+
+        /// <summary>
+        /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains
+        /// task state, task configuration, and active context that the task is running on. 
+        /// </summary>
+        private readonly IDictionary<string, TaskInfo> _tasks = new Dictionary<string, TaskInfo>();
+
+        /// <summary>
+        /// This Dictionary keeps all the running tasks. The key is the Task Id and the value is IRunningTask. 
+        /// After a task is running, it will be added to this collection. After the task is requested to close, 
+        /// or fails, completed, it will be removed from this collection. 
+        /// </summary>
+        private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>();
+
+        /// <summary>
+        /// Total expected tasks
+        /// </summary>
+        private readonly int _totalExpectedTasks;
+
+        /// <summary>
+        /// Master tasks Id is set in the IGroupCommDriver. It must be the same Id used in the TaskManager.
+        /// </summary>
+        private readonly string _masterTaskId;
+
+        /// <summary>
+        /// Total number of Application error received from tasks
+        /// </summary>
+        private int _numberOfAppErrors = 0;
+
+        /// <summary>
+        /// Creates a TaskManager with specified total number of tasks and master task id.
+        /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null.
+        /// </summary>
+        /// <param name="numTasks"></param>
+        /// <param name="masterTaskId"></param>
+        internal TaskManager(int numTasks, string masterTaskId)
+        {
+            if (numTasks <= 0)
+            {
+                Exceptions.Throw(new IMRUSystemException("Number of expected tasks must be positive"), Logger);
+            }
+
+            if (string.IsNullOrWhiteSpace(masterTaskId))
+            {
+                Exceptions.Throw(new IMRUSystemException("masterTaskId cannot be null"), Logger);
+            }
+
+            _totalExpectedTasks = numTasks;
+            _masterTaskId = masterTaskId;
+        }
+
+        /// <summary>
+        /// Adds a Task to the task collection
+        /// Throws IMRUSystemException in the following cases:
+        ///   taskId is already added 
+        ///   taskConfiguration is null
+        ///   activeContext is null
+        ///   trying to add extra tasks
+        ///   No Master Task is added in the collection
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <param name="taskConfiguration"></param>
+        /// <param name="activeContext"></param>
+        internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext)
+        {
+            if (taskId == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("The taskId is null."), Logger);
+            }
+
+            if (_tasks.ContainsKey(taskId))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already exists.", taskId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            if (taskConfiguration == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("The task configuration is null."), Logger);
+            }
+
+            if (activeContext == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("The context is null."), Logger);
+            }
+
+            if (NumberOfTasks >= _totalExpectedTasks)
+            {
+                string msg = string.Format("Trying to add an additional Task {0}, but the total expected Task number {1} has been reached.", taskId, _totalExpectedTasks);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            _tasks.Add(taskId, new TaskInfo(new TaskStateMachine(), taskConfiguration, activeContext));
+
+            if (NumberOfTasks == _totalExpectedTasks && !MasterTaskExists())
+            {
+                Exceptions.Throw(new IMRUSystemException("There is no master task added."), Logger);
+            }
+        }
+
+        /// <summary>
+        /// Returns the number of tasks in the task collection
+        /// </summary>
+        internal int NumberOfTasks
+        {
+            get { return _tasks.Count; }
+        }
+
+        /// <summary>
+        /// This method is called when receiving IRunningTask event during task submitting.
+        /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning.
+        /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task.
+        /// </summary>
+        /// <param name="runningTask"></param>
+        internal void RecordRunningTask(IRunningTask runningTask)
+        {
+            if (_runningTasks.ContainsKey(runningTask.Id))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already running.", runningTask.Id);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            if (!_tasks.ContainsKey(runningTask.Id))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist.", runningTask.Id);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            _runningTasks.Add(runningTask.Id, runningTask);
+            UpdateState(runningTask.Id, TaskStateEvent.RunningTask);
+        }
+
+        /// <summary>
+        /// This method is called at the beginning of the recovery.
+        /// Clears the task collection, running task collection and resets the number of application error. 
+        /// </summary>
+        internal void Reset()
+        {
+            _tasks.Clear();
+            _runningTasks.Clear();
+            _numberOfAppErrors = 0;
+        }
+
+        /// <summary>
+        /// This method is called when receiving ICompletedTask event during task running or system shutting down.
+        /// Removes the task from running tasks
+        /// Changes the task state from RunningTask to CompletedTask
+        /// </summary>
+        /// <param name="completedTask"></param>
+        internal void RecordCompletedTask(ICompletedTask completedTask)
+        {
+            RemoveRunningTask(completedTask.Id);
+            UpdateState(completedTask.Id, TaskStateEvent.CompletedTask);
+        }
+
+        /// <summary>
+        /// This method is called when receiving IFailedTask event during task submitting or running
+        /// Removes the task from running tasks if the task was running
+        /// Updates the task state to fail based on the error message in the failed task
+        /// </summary>
+        /// <param name="failedTask"></param>
+        internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask)
+        {
+            //// Remove the task from running tasks if it exists there
+            _runningTasks.Remove(failedTask.Id);
+            UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+        }
+
+        /// <summary>
+        /// This method is called when receiving IFailedTask event during system shutting down. 
+        /// If the task failed because it receives the close command from driver, update the task state to TaskClosedByDriver.
+        /// Task could fail by communication error or any other application or system error during this time, as long as it is not 
+        /// TaskFailedByEvaluatorFailure, update the task state based on the error received. 
+        /// </summary>
+        /// <param name="failedTask"></param>
+        internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
+        {
+            var taskState = GetTaskState(failedTask.Id);
+            if (taskState == StateMachine.TaskState.TaskWaitingForClose)
+            {
+                UpdateState(failedTask.Id, TaskStateEvent.ClosedTask);
+            }
+            else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure)
+            {
+                UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+            }
+        }
+
+        /// <summary>
+        /// This method is called when receiving an IFailedEvaluator event during TaskSubmitted, TaskRunning or system shutting down.
+        /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. 
+        /// Sets the task state to TaskFailedByEvaluatorFailure 
+        /// </summary>
+        /// <param name="failedEvaluator"></param>
+        internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator)
+        {
+            if (failedEvaluator.FailedTask.IsPresent())
+            {
+                var taskId = failedEvaluator.FailedTask.Value.Id;
+                var taskState = GetTaskState(taskId);
+                if (taskState == StateMachine.TaskState.TaskRunning)
+                {
+                    RemoveRunningTask(taskId);
+                }
+
+                UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+            }
+        }
+
+        /// <summary>
+        /// Removes a task from running tasks if it exists in the running tasks collection
+        /// </summary>
+        /// <param name="taskId"></param>
+        private void RemoveRunningTask(string taskId)
+        {
+            if (!_runningTasks.ContainsKey(taskId))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _runningTasks.Remove(taskId);
+        }
+
+        /// <summary>
+        /// Updates task state for a given taskId based on the task event
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <param name="taskEvent"></param>
+        private void UpdateState(string taskId, TaskStateEvent taskEvent)
+        {
+            GetTaskInfo(taskId).TaskState.MoveNext(taskEvent);
+        }
+
+        /// <summary>
+        /// Checks if all the tasks are running.
+        /// </summary>
+        /// <returns></returns>
+        internal bool AreAllTasksRunning()
+        {
+            return AreAllTasksInState(StateMachine.TaskState.TaskRunning) &&
+                _runningTasks.Count == _totalExpectedTasks;
+        }
+
+        /// <summary>
+        /// Checks if all the tasks are completed.
+        /// </summary>
+        /// <returns></returns>
+        internal bool AreAllTasksCompleted()
+        {
+            return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0;
+        }
+
+        /// <summary>
+        /// This method is called when receiving either IFailedEvaluator or IFailedTask event
+        /// Driver tries to close all the running tasks and clean the running task collection in the end.
+        /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1
+        /// If this happens before all the tasks are running, then the total number of running tasks should smaller than _totalExpectedTasks -1
+        /// If this happens when no task is running, the total number of running tasks could be 0
+        /// </summary>
+        internal void CloseAllRunningTasks(string closeMessage)
+        {
+            Logger.Log(Level.Verbose, "Closing [{0}] running tasks.", _runningTasks.Count);
+            foreach (var runningTask in _runningTasks.Values)
+            {
+                runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage));
+                UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose);
+            }
+            _runningTasks.Clear();
+        }
+
+        /// <summary>
+        /// This method is called when receiving an IRunningTask event but system is either in shutting down or fail.
+        /// In this case, the task should not be added in Running Tasks yet.
+        /// Change the task state to TaskRunning if it is still in TaskSubmitted state
+        /// Closes the IRunningTask 
+        /// Then move the task state to WaitingTaskToClose
+        /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection
+        /// </summary>
+        /// <param name="runningTask"></param>
+        /// <param name="closeMessage"></param>
+        internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage)
+        {
+            if (runningTask == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("RunningTask is null."), Logger);
+            }
+
+            if (_runningTasks.ContainsKey(runningTask.Id))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] is already in running tasks.", runningTask.Id);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            UpdateState(runningTask.Id, TaskStateEvent.RunningTask);
+            runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage));
+            UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose);
+        }
+
+        /// <summary>
+        /// Gets error type based on the information in IFailedTask 
+        /// Currently we use the Message in IFailedTask to distinguish different types of errors
+        /// </summary>
+        /// <param name="failedTask"></param>
+        /// <returns></returns>
+        private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask)
+        {
+            switch (failedTask.Message)
+            {
+                case TaskAppError:
+                    _numberOfAppErrors++;
+                    return TaskStateEvent.FailedTaskAppError;
+                case TaskSystemError:
+                    return TaskStateEvent.FailedTaskSystemError;
+                case TaskGroupCommunicationError:
+                    return TaskStateEvent.FailedTaskCommunicationError;
+                default:
+                    return TaskStateEvent.FailedTaskSystemError;
+            }
+        }
+
+        /// <summary>
+        /// Returns the number of application error caused by FailedTask
+        /// </summary>
+        /// <returns></returns>
+        internal int NumberOfAppErrors()
+        {
+            return _numberOfAppErrors;
+        }
+
+        /// <summary>
+        /// Checks if all the tasks are in final states
+        /// </summary>
+        /// <returns></returns>
+        internal bool AllInFinalState()
+        {
+            return _tasks.All(t => t.Value.TaskState.IsFinalState());
+        }
+
+        /// <summary>
+        /// Gets current state of the task
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <returns></returns>
+        internal TaskState GetTaskState(string taskId)
+        {
+            var taskInfo = GetTaskInfo(taskId);
+            return taskInfo.TaskState.CurrentState;
+        }
+
+        /// <summary>
+        /// Checks if all the tasks are in the state specified. 
+        /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state
+        /// </summary>
+        /// <param name="taskState"></param>
+        /// <returns></returns>
+        internal bool AreAllTasksInState(TaskState taskState)
+        {
+            return _tasks.All(t => t.Value.TaskState.CurrentState == taskState);
+        }
+
+        /// <summary>
+        /// Submit all the tasks
+        /// Tasks will be submitted after all the tasks are added in the collection and master task exists
+        /// IMRUSystemException will be thrown if not all the tasks are added or if there is no master task
+        /// </summary>
+        internal void SubmitTasks()
+        {
+            if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+            {
+                string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            foreach (var taskId in _tasks.Keys)
+            {
+                var taskInfo = GetTaskInfo(taskId);
+                taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
+                UpdateState(taskId, TaskStateEvent.SubmittedTask);
+            }
+        }
+
+        /// <summary>
+        /// Checks if master task has been added
+        /// </summary>
+        /// <returns></returns>
+        private bool MasterTaskExists()
+        {
+            return _tasks.ContainsKey(_masterTaskId);
+        }
+
+        /// <summary>
+        /// Gets task Tuple based on the given taskId. 
+        /// Throws IMRUSystemException if the task Tuple is not in the task collection.
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <returns></returns>
+        private TaskInfo GetTaskInfo(string taskId)
+        {
+            TaskInfo taskInfo;
+            _tasks.TryGetValue(taskId, out taskInfo);
+            if (taskInfo == null)
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] does not exist in the task collection.", taskId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            return taskInfo;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 2f5cf03..b51638a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -88,6 +88,8 @@ under the License.
     <Compile Include="OnREEF\Driver\StateMachine\TaskStateEvent.cs" />
     <Compile Include="OnREEF\Driver\StateMachine\TaskState.cs" />
     <Compile Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" />
+    <Compile Include="OnREEF\Driver\TaskInfo.cs" />
+    <Compile Include="OnREEF\Driver\TaskManager.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" />