You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/16 19:40:51 UTC

reef git commit: [REEF-1511] Add timeout for Task shutdown during IMRU recovery

Repository: reef
Updated Branches:
  refs/heads/master 7adaa97d3 -> 6c597f140


[REEF-1511] Add timeout for Task shutdown during IMRU recovery

During IMRU FT recovery sometimes the tasks that are supposed
to be closed by driver don't report back, causing the system hang.
This change adds a timeout for tasks closed by driver, so that
evaluators of unresponsive tasks are shut down after timeout.

The average task closing time is recorded in the TaskManager.
This number is a reference to define the timeout on the fly.
In case the average number is accessed before the data is accumulated,
or the average number is too low in some scenarios, we have
a configurable MinTaskWaitingForCloseTimeout to ensure that
the driver waits long enough before killing the evaluators.

JIRA:
  [REEF-1511](https://issues.apache.org/jira/browse/REEF-1511)

Pull request:
  This closes #1201


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6c597f14
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6c597f14
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6c597f14

Branch: refs/heads/master
Commit: 6c597f140184471969ce8bac4c96fc41ca8ea36d
Parents: 7adaa97
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Dec 13 17:33:37 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Fri Dec 16 11:36:56 2016 -0800

----------------------------------------------------------------------
 .../TestTaskManager.cs                          |  43 ++++++
 .../OnREEF/Driver/IMRUDriver.cs                 | 146 ++++++++++++++++++-
 .../OnREEF/Driver/TaskInfo.cs                   |   7 +
 .../OnREEF/Driver/TaskManager.cs                | 111 ++++++++++----
 .../Parameters/MinTaskWaitingForCloseTimeout.cs |  26 ++++
 .../Parameters/TimeoutMonitoringInterval.cs     |  26 ++++
 .../Org.Apache.REEF.IMRU.csproj                 |   2 +
 7 files changed, 331 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index 7787d1a..d0cdda8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Threading;
 using NSubstitute;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
@@ -177,6 +178,22 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Tests AverageClosingTime
+        /// </summary>
+        [Fact]
+        public void TestTasksClosingTime()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Thread.Sleep(100);
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+
+            Assert.True(taskManager.AverageClosingTime() > 0);
+        }
+
+        /// <summary>
         /// Tests RecordCompletedRunningTask
         /// </summary>
         [Fact]
@@ -214,6 +231,32 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Tests closing running tasks
+        /// </summary>
+        [Fact]
+        public void TestTasksWaitingForClose()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver);
+
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Thread.Sleep(100);
+            var tasks = taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, 50);
+            Assert.Equal(tasks.Count, 3);
+
+            foreach (var t in tasks)
+            {
+                taskManager.RecordKillClosingTask(t.Key);
+            }
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskClosedByDriver));
+        }
+
+        /// <summary>
         /// Tests record failed tasks after all the tasks are running
         /// </summary>
         [Fact]

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 78e0c4d..d0cd537 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -20,6 +20,7 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
+using System.Timers;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
@@ -91,6 +92,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private readonly object _lock = new object();
 
         /// <summary>
+        /// Multiply this fact on average closing time to give room for tasks to be closed by itself.
+        /// </summary>
+        private const int TaskWaitingForCloseTimeFactor = 3;
+
+        /// <summary>
         /// Manages Tasks, maintains task states and responsible for task submission for the driver.
         /// </summary>
         private TaskManager _taskManager;
@@ -127,10 +133,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private int _numberOfRetries;
 
         /// <summary>
+        /// Minimum timeout in milliseconds for TaskWaitingForClose
+        /// </summary>
+        private readonly int _minTaskWaitingForCloseTimeout;
+
+        /// <summary>
         /// Manages lifecycle events for driver, like JobCancelled event.
         /// </summary>
         private readonly List<IDisposable> _disposableResources = new List<IDisposable>();
 
+        /// <summary>
+        /// An internal timer that monitors the timeout for driver events
+        /// </summary>
+        private Timer _timeoutMonitorTimer;
+
+        /// <summary>
+        /// Record evaluator ids that are closed after timeout.
+        /// The CompletedTask and failedEvaluator events from those tasks should be ignored to avoid double counted.
+        /// </summary>
+        private readonly IList<string> _evaluatorsForceClosed = new List<string>();
+
         [Inject]
         private IMRUDriver(IPartitionedInputDataSet dataSet,
             [Parameter(typeof(PerMapConfigGeneratorSet))] ISet<IPerMapperConfigGenerator> perMapperConfigs,
@@ -142,6 +164,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
             [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction,
             [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
+            [Parameter(typeof(MinTaskWaitingForCloseTimeout))] int minTaskWaitingForCloseTimeout,
+            [Parameter(typeof(TimeoutMonitoringInterval))] int timeoutMonitoringInterval,
             [Parameter(typeof(InvokeGC))] bool invokeGC,
             IGroupCommDriver groupCommDriver,
             INameServer nameServer,
@@ -154,6 +178,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             _totalMappers = dataSet.Count;
             _invokeGC = invokeGC;
             _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery;
+            _minTaskWaitingForCloseTimeout = minTaskWaitingForCloseTimeout;
 
             _contextManager = new ActiveContextManager(_totalMappers + 1);
             _contextManager.Subscribe(this);
@@ -172,15 +197,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 var handle = lifecycleManager.Subscribe(this as IObserver<IJobCancelled>);
                 _disposableResources.Add(handle);
             }
-            
+
+            _timeoutMonitorTimer = new Timer();
+            _timeoutMonitorTimer.Elapsed += TimeoutMonitor;
+            _timeoutMonitorTimer.Interval = timeoutMonitoringInterval;
+            if (timeoutMonitoringInterval > 0)
+            {
+                _timeoutMonitorTimer.Enabled = true;
+            }
+
             var msg =
-                string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.",
+                string.Format(CultureInfo.InvariantCulture, "map task memory: {0}, update task memory: {1}, map task cores: {2}, update task cores: {3}, maxRetry: {4}, allowedFailedEvaluators: {5}, minTaskWaitingForCloseTimeout: {6}, timeoutMonitoringInterval: {7}.",
                     memoryPerMapper,
                     memoryForUpdateTask,
                     coresPerMapper,
                     coresForUpdateTask,
                     _maxRetryNumberForFaultTolerant,
-                    allowedFailedEvaluators);
+                    allowedFailedEvaluators,
+                    minTaskWaitingForCloseTimeout,
+                    timeoutMonitoringInterval);
             Logger.Log(Level.Info, msg);
         }
 
@@ -446,6 +481,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries);
             lock (_lock)
             {
+                if (_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId))
+                {
+                    Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring ICompletedTask event.", completedTask.ActiveContext.EvaluatorId, completedTask.Id);
+                    return;
+                }
                 switch (_systemState.CurrentState)
                 {
                     case SystemState.TasksRunning:
@@ -516,6 +556,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
                 {
+                    if (_evaluatorsForceClosed.Contains(failedEvaluator.Id))
+                    {
+                        Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedEvaluator event.", failedEvaluator.Id, failedEvaluator.FailedTask.IsPresent() ? failedEvaluator.FailedTask.Value.Id : "NoTaskId");
+                        return;
+                    }
+
                     var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
                     _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
                     _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
@@ -644,6 +690,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedTask"))
                 {
+                    if (_evaluatorsForceClosed.Contains(failedTask.GetActiveContext().Value.EvaluatorId))
+                    {
+                        Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedTask event..", failedTask.GetActiveContext().Value.EvaluatorId, failedTask.Id);
+                        return;
+                    }
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.SubmittingTasks:
@@ -674,6 +725,82 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
         #endregion IFailedTask
 
+        private void TimeoutMonitor(object source, ElapsedEventArgs e)
+        {
+            Logger.Log(Level.Info, "Entering TimeoutMonitor at {0}", DateTime.Now);
+            lock (_lock)
+            {
+                switch (_systemState.CurrentState)
+                {
+                    // TODO: Handle time out if ActiveContexts are not received in timeout limit
+                    case SystemState.WaitingForEvaluator:
+                        break;
+
+                    // TODO: Handle time out if RunningTasks are not received in timeout limit
+                    case SystemState.SubmittingTasks:
+                        break;
+
+                    // TODO: Handle time out if CompletedTasks are not received in timeout limit
+                    case SystemState.TasksRunning:
+                        break;
+
+                    // Handle timeout for closing tasks
+                    case SystemState.ShuttingDown:
+                        Logger.Log(Level.Info, "_taskManager.AverageClosingTime {0}, _minTaskWaitingForCloseTimeout: {1}", _taskManager.AverageClosingTime(), _minTaskWaitingForCloseTimeout);
+                        int taskClosingTimeout = Math.Max(_minTaskWaitingForCloseTimeout, _taskManager.AverageClosingTime() * TaskWaitingForCloseTimeFactor);
+                        var waitingTasks = _taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, taskClosingTimeout);
+
+                        if (waitingTasks.Any())
+                        {
+                            WaitingForCloseTaskNoResponseAction(waitingTasks);
+                        }
+                        break;
+
+                    case SystemState.TasksCompleted:
+                        break;
+
+                    case SystemState.Fail:
+                        break;
+                }
+            }
+        }
+
+        /// <summary>
+        /// For tasks that are in WaitingForCloseState and has no response in specified timeout
+        /// kill the evaluator and set the other states as if we received the FailedEvaluator
+        /// Then try recovery
+        /// </summary>
+        /// <param name="tasks"></param>
+        private void WaitingForCloseTaskNoResponseAction(IList<KeyValuePair<string, TaskInfo>> tasks)
+        {
+            foreach (var t in tasks)
+            {
+                string evaluatorId = t.Value.ActiveContext.EvaluatorId;
+                if (!_evaluatorsForceClosed.Contains(evaluatorId))
+                {
+                    _evaluatorsForceClosed.Add(evaluatorId);
+                    Logger.Log(Level.Info,
+                        "WaitingForCloseTask [{0}] has no response after timeout. Kill the evaluator: [{1}] and dispose the context: [{2}].",
+                        t.Key,
+                        evaluatorId,
+                        t.Value.ActiveContext.Id);
+
+                    t.Value.ActiveContext.Dispose();
+                    var isMaster = _evaluatorManager.IsMasterEvaluatorId(evaluatorId);
+                    _evaluatorManager.RecordFailedEvaluator(evaluatorId);
+                    _contextManager.Remove(t.Value.ActiveContext.Id);
+                    _taskManager.RecordKillClosingTask(t.Key);
+
+                    // Push evaluator id back to PartitionIdProvider if it is not master
+                    if (!isMaster)
+                    {
+                        _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(evaluatorId);
+                    }
+                }
+            }
+            TryRecovery();
+        }
+
         public void OnNext(IJobCancelled value)
         {
             lock (_lock)
@@ -1087,5 +1214,18 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             } 
             return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName; 
         }
+
+        /// <summary>
+        /// Ensure the Timer is disposed when the driver object is deleted
+        /// </summary>
+        ~IMRUDriver()
+        {
+            if (_timeoutMonitorTimer != null)
+            {
+                _timeoutMonitorTimer.Stop();
+                _timeoutMonitorTimer.Dispose();
+                _timeoutMonitorTimer = null;
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
index 6ae992d..deb027d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
 using Org.Apache.REEF.Tang.Interface;
@@ -38,6 +39,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             _taskState = taskState;
             _taskConfiguration = config;
             _activeContext = context;
+            TimeStateUpdated = DateTime.Now;
         }
 
         internal TaskStateMachine TaskState
@@ -54,5 +56,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             get { return _activeContext; }
         }
+
+        /// <summary>
+        /// time that the task state is updated
+        /// </summary>
+        internal DateTime TimeStateUpdated { get; set; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index b2fe281..4ba9745 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -89,6 +89,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private int _numberOfAppErrors = 0;
 
         /// <summary>
+        /// Total Task closing time span. It is used to calculate the average closing time.
+        /// </summary>
+        private TimeSpan _totalTaskClosingTimeSpan;
+
+        /// <summary>
+        /// Total number of the tasks that is closed by driver and then completed.
+        /// </summary>
+        private int _totalNumberOfClosedTasksByDriver;
+
+        /// <summary>
         /// Indicate if master task is completed running properly
         /// </summary>
         private bool _masterTaskCompletedRunning = false;
@@ -124,9 +134,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         ///   trying to add extra tasks
         ///   No Master Task is added in the collection
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <param name="taskConfiguration"></param>
-        /// <param name="activeContext"></param>
         internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext)
         {
             if (taskId == null)
@@ -177,7 +184,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning.
         /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task.
         /// </summary>
-        /// <param name="runningTask"></param>
         internal void RecordRunningTask(IRunningTask runningTask)
         {
             if (_runningTasks.ContainsKey(runningTask.Id))
@@ -215,7 +221,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Changes the task state from RunningTask to CompletedTask if the task was running
         /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
         /// </summary>
-        /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
         {
             if (completedTask.Id.Equals(_masterTaskId))
@@ -234,7 +239,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Removes the task from running tasks if the task was running
         /// Updates the task state to fail based on the error message in the failed task
         /// </summary>
-        /// <param name="failedTask"></param>
         internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask)
         {
             //// Remove the task from running tasks if it exists there
@@ -248,7 +252,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Task could fail by communication error or any other application or system error during this time, as long as it is not 
         /// TaskFailedByEvaluatorFailure, update the task state based on the error received. 
         /// </summary>
-        /// <param name="failedTask"></param>
         internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
         {
             Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString());
@@ -269,7 +272,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. 
         /// Sets the task state to TaskFailedByEvaluatorFailure 
         /// </summary>
-        /// <param name="failedEvaluator"></param>
         internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator)
         {
             if (failedEvaluator.FailedTask.IsPresent())
@@ -301,6 +303,29 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             }
         }
 
+        /// <summary>
+        /// Waiting for close task has no response in given time
+        /// Driver will kill the evaluator and move the task to TaskFailedByEvaluatorFailure state
+        /// </summary>
+        internal void RecordKillClosingTask(string taskId)
+        {
+            var taskInfo = GetTaskInfo(taskId);
+            if (!taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture,
+                           "The task [{0}] is in [{1}] state, expecting it is in TaskWaitingForClose state.",
+                           taskId, taskInfo.TaskState.CurrentState);
+                Logger.Log(Level.Error, msg);
+                throw new IMRUSystemException(msg);
+            }
+            UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+        }
+
+        /// <summary>
+        /// Find the task that is associated with the given evaluator
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
         private string FindTaskAssociatedWithTheEvalutor(string evaluatorId)
         {
             return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault();
@@ -309,17 +334,46 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Updates task state for a given taskId based on the task event
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <param name="taskEvent"></param>
         private void UpdateState(string taskId, TaskStateEvent taskEvent)
         {
-            GetTaskInfo(taskId).TaskState.MoveNext(taskEvent);
+            var taskInfo = GetTaskInfo(taskId);
+            RecordingTime(taskId, taskInfo, taskEvent);
+            taskInfo.TaskState.MoveNext(taskEvent);
+            taskInfo.TimeStateUpdated = DateTime.Now;
         }
 
         /// <summary>
-        /// Returns true if master task has completed and produced result
+        /// Recording timing from one task state to another
+        /// The method can be extended to record the time for other task states
+        /// The log level should be changed to verb once we complete the testing
+        /// </summary>
+        private void RecordingTime(string taskId, TaskInfo taskInfo, TaskStateEvent taskEvent)
+        {
+            if (taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose) && taskEvent.Equals(TaskStateEvent.CompletedTask))
+            {
+                var timeSpan = DateTime.Now - taskInfo.TimeStateUpdated;
+                _totalNumberOfClosedTasksByDriver++;
+                _totalTaskClosingTimeSpan = _totalTaskClosingTimeSpan.Add(timeSpan);
+                Logger.Log(Level.Info, "RecordClosingTime for task id {0}, closing time: {1}, average closing time: {2}.", taskId, timeSpan.Milliseconds, _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver);
+            }
+        }
+
+        /// <summary>
+        /// Get average closing time
         /// </summary>
         /// <returns></returns>
+        internal int AverageClosingTime()
+        {
+            if (_totalNumberOfClosedTasksByDriver != 0)
+            {
+                return _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver;
+            }
+            return 0;
+        }
+
+        /// <summary>
+        /// Returns true if master task has completed and produced result
+        /// </summary>
         internal bool IsMasterTaskCompletedRunning()
         {
             return _masterTaskCompletedRunning;
@@ -328,7 +382,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Checks if all the tasks are running.
         /// </summary>
-        /// <returns></returns>
         internal bool AreAllTasksRunning()
         {
             return AreAllTasksInState(StateMachine.TaskState.TaskRunning) &&
@@ -339,13 +392,30 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// When master task is completed, that means the system has got the result expected 
         /// regardless of other mapper tasks returned or not. 
         /// </summary>
-        /// <returns></returns>
         internal bool IsJobDone()
         {
             return IsMasterTaskCompletedRunning();
         }
 
         /// <summary>
+        /// Finds all the tasks that are waiting for close and waiting time is timeout
+        /// </summary>
+        internal IList<KeyValuePair<string, TaskInfo>> TasksTimeoutInState(TaskState state, int timeoutMilliseconds)
+        {
+            return _tasks.Where(t => t.Value.TaskState.CurrentState.Equals(state) && Timeout(t.Value.TimeStateUpdated, timeoutMilliseconds))
+                .ToList();
+        }
+
+        /// <summary>
+        /// Check if the given DateTime has passed the timeoutMilliseconds
+        /// </summary>
+        private static bool Timeout(DateTime time, int timeoutMilliseconds)
+        {
+            TimeSpan span = DateTime.Now - time;
+            return span.Milliseconds > timeoutMilliseconds;
+        }
+
+        /// <summary>
         /// This method is called when receiving either IFailedEvaluator or IFailedTask event
         /// Driver tries to close all the running tasks and clean the running task collection in the end.
         /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1
@@ -371,8 +441,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Then move the task state to WaitingTaskToClose
         /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection
         /// </summary>
-        /// <param name="runningTask"></param>
-        /// <param name="closeMessage"></param>
         internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage)
         {
             if (runningTask == null)
@@ -396,8 +464,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions
         /// treat then as application error.
         /// </summary>
-        /// <param name="failedTask"></param>
-        /// <returns></returns>
         private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask)
         {
             var exception = failedTask.AsError();
@@ -457,7 +523,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Returns the number of application error caused by FailedTask
         /// </summary>
-        /// <returns></returns>
         internal int NumberOfAppErrors()
         {
             return _numberOfAppErrors;
@@ -466,7 +531,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Checks if all the tasks are in final states
         /// </summary>
-        /// <returns></returns>
         internal bool AreAllTasksInFinalState()
         {
             var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList();
@@ -504,8 +568,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Gets current state of the task
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <returns></returns>
         internal TaskState GetTaskState(string taskId)
         {
             var taskInfo = GetTaskInfo(taskId);
@@ -516,8 +578,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Checks if all the tasks are in the state specified. 
         /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state
         /// </summary>
-        /// <param name="taskState"></param>
-        /// <returns></returns>
         internal bool AreAllTasksInState(TaskState taskState)
         {
             return _tasks.All(t => t.Value.TaskState.CurrentState == taskState);
@@ -566,7 +626,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Checks if master task has been added
         /// </summary>
-        /// <returns></returns>
         private bool MasterTaskExists()
         {
             return _tasks.ContainsKey(_masterTaskId);
@@ -576,8 +635,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Gets task Tuple based on the given taskId. 
         /// Throws IMRUSystemException if the task Tuple is not in the task collection.
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <returns></returns>
         private TaskInfo GetTaskInfo(string taskId)
         {
             TaskInfo taskInfo;

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs
new file mode 100644
index 0000000..2124eef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs
@@ -0,0 +1,26 @@
+\ufeff// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Minimum timeout after which unresponsive tasks which are supposed to be closed will be killed together with their evaluators.", "TaskWaitingForCloseTimeout", "30000")]
+    public sealed class MinTaskWaitingForCloseTimeout : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs
new file mode 100644
index 0000000..bf5247d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs
@@ -0,0 +1,26 @@
+\ufeff// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Interval at which checks for timeout are done, in milliseconds.", "TimeoutMonitoringInterval", "50000")]
+    public sealed class TimeoutMonitoringInterval : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6c597f14/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 330f4a0..9dc3e9a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -128,6 +128,8 @@ under the License.
     <Compile Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedUpdateTaskStateConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SleepIntervalParameter.cs" />
+    <Compile Include="OnREEF\Parameters\MinTaskWaitingForCloseTimeout.cs" />
+    <Compile Include="OnREEF\Parameters\TimeoutMonitoringInterval.cs" />
     <Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
     <Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
     <Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />