You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/13 01:54:05 UTC

reef git commit: [REEF-1685] Complete the Job properly if update/master task is completed from running state

Repository: reef
Updated Branches:
  refs/heads/master eb66b2e9d -> 765d7f616


[REEF-1685] Complete the Job properly if update/master task is completed from running state

In stress testing, we have seen the following scenario: master/update task is successfully completed,
and most of mapper tasks are also completed, but then driver receives an IFailedEvaluator event.
This would result in system transitioning to ShuttingDown state and doing an unnecessary retry.

When the driver receives ICompletedTask from master task which was in running state,
that means we have completed the calculation and result has been written to the output.
The driver should execute DoneAction to dispose of all the contexts and shut down the system.
After that, any FailedEvalutor/FailedTask events received should be ignored.

JIRA:
  [REEF-1685](https://issues.apache.org/jira/browse/REEF-1685)

Pull request:
  This closes #1200


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/765d7f61
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/765d7f61
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/765d7f61

Branch: refs/heads/master
Commit: 765d7f616844ed947549465ce991b9ee71b4c190
Parents: eb66b2e
Author: Julia Wang <ju...@apache.org>
Authored: Tue Dec 6 12:50:41 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Mon Dec 12 17:17:15 2016 -0800

----------------------------------------------------------------------
 .../TestTaskManager.cs                          | 19 ++++++
 .../OnREEF/Driver/IMRUDriver.cs                 | 71 ++++++++++++--------
 .../OnREEF/Driver/TaskManager.cs                | 32 +++++++--
 .../Functional/IMRU/IMRUBroadcastReduceTest.cs  |  3 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs |  4 +-
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      |  3 +-
 .../IMRU/TestFailMapperTasksOnDispose.cs        |  3 +-
 .../IMRU/TestFailMapperTasksOnInit.cs           |  3 +-
 8 files changed, 100 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index d35f7c8..7787d1a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -177,6 +177,25 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Tests RecordCompletedRunningTask
+        /// </summary>
+        [Fact]
+        public void TestIsMasterCompleted()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning));
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+            Assert.False(taskManager.IsMasterTaskCompletedRunning());
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+            Assert.True(taskManager.IsMasterTaskCompletedRunning());
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted));
+        }
+
+        /// <summary>
         /// Tests closing running tasks
         /// </summary>
         [Fact]

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index a895a78..78e0c4d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -430,10 +430,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State:
         /// Case TasksRunning
-        ///     Updates task state to TaskCompleted
+        ///     Check if it is master task, then set master task completed    
+        ///     Then record completed running and updates task state from TaskRunning to TaskCompleted
         ///     If all tasks are completed, sets system state to TasksCompleted and then go to Done action
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the event        
         /// Case ShuttingDown
-        ///     Updates task state to TaskCompleted
+        ///     Record completed running and updates task state to TaskCompleted
         ///     Try to recover
         /// Other cases - not expected 
         /// </summary>
@@ -447,18 +450,24 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 {
                     case SystemState.TasksRunning:
                         _taskManager.RecordCompletedTask(completedTask);
-                        if (_taskManager.AreAllTasksCompleted())
+                        if (_taskManager.IsJobDone())
                         {
                             _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
-                            Logger.Log(Level.Info, "All tasks are completed, systemState {0}", _systemState.CurrentState);
+                            Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState);
                             DoneAction();
                         }
                         break;
+
                     case SystemState.ShuttingDown:
                         // The task might be in running state or waiting for close, record the completed task
                         _taskManager.RecordCompletedTask(completedTask);
                         TryRecovery();
                         break;
+
+                    case SystemState.TasksCompleted:
+                        _taskManager.RecordCompletedTask(completedTask);
+                        break;
+
                     default:
                         UnexpectedState(completedTask.Id, "ICompletedTask");
                         break;
@@ -470,7 +479,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         #region IFailedEvaluator
         /// <summary>
         /// IFailedEvaluator handler. It specifies what to do when an evaluator fails.
-        /// If we get all completed tasks then ignore the failure. Otherwise, take the following actions based on the system state: 
         /// Case WaitingForEvaluator
         ///     This happens in the middle of submitting contexts. We just need to remove the failed evaluator 
         ///     from EvaluatorManager and remove associated active context, if any, from ActiveContextManager
@@ -483,6 +491,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         ///     Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure
         ///     Closes all the other running tasks
         ///     Try to recover in case it is the last failure received
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the failure. 
         /// Case ShuttingDown
         ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
         ///     Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
@@ -506,14 +516,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
                 {
-                    if (_taskManager != null && _taskManager.AreAllTasksCompleted())
-                    {
-                        Logger.Log(Level.Verbose,
-                            "All IMRU tasks have been completed. So ignoring the Evaluator {0} failure.",
-                            failedEvaluator.Id);
-                        return;
-                    }
-
                     var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
                     _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
                     _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
@@ -557,6 +559,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                             TryRecovery();
                             break;
 
+                        case SystemState.TasksCompleted:
+                            _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Evaluator {0} failure.", failedEvaluator.Id);
+                            break;
+
                         case SystemState.ShuttingDown:
                             _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
 
@@ -591,30 +598,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="failedContext"></param>
         public void OnNext(IFailedContext failedContext)
         {
+            Logger.Log(Level.Warning, "Received IFailedContext with Id: {0} from endpoint {1} with systemState {2} in retry#: {3}.", failedContext.Id, GetEndPointFromContext(failedContext), _systemState.CurrentState, _numberOfRetries);
             lock (_lock)
             {
-                if (_taskManager.AreAllTasksCompleted())
+                using (Logger.LogFunction("IMRUDriver::IFailedContext"))
                 {
-                    Logger.Log(Level.Info, "Context with Id: {0} failed but IMRU tasks are completed. So ignoring.", failedContext.Id);
-                    return;
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.TasksCompleted:
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Context {0} failure.", failedContext.Id);
+                            break;
+                        case SystemState.ShuttingDown:
+                        case SystemState.Fail:
+                            break;
+                        default:
+                            var msg = string.Format(CultureInfo.InvariantCulture, "Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
+                            throw new NotImplementedException(msg);
+                    }
                 }
-
-                var msg = string.Format("Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
-                Exceptions.Throw(new Exception(msg), Logger);
             }
         }
         #endregion IFailedContext
 
         #region IFailedTask
         /// <summary>
-        /// IFailedTask handler. It specifies what to do when task fails.
-        /// If we get all completed tasks then ignore the failure. Otherwise take the following actions based on the System state:
         /// Case SubmittingTasks/TasksRunning
         ///     This is the first failure received
         ///     Changes the system state to ShuttingDown
         ///     Record failed task in TaskManager
         ///     Closes all the other running tasks and set their state to TaskWaitingForClose
         ///     Try to recover
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the failure. 
         /// Case ShuttingDown
         ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
         ///     Record failed task in TaskManager.
@@ -629,14 +644,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedTask"))
                 {
-                    if (_taskManager.AreAllTasksCompleted())
-                    {
-                        Logger.Log(Level.Info,
-                            "Task with Id: {0} failed but all IMRU tasks are completed. So ignoring.",
-                            failedTask.Id);
-                        return;
-                    }
-
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.SubmittingTasks:
@@ -648,6 +655,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                             TryRecovery();
                             break;
 
+                        case SystemState.TasksCompleted:
+                            _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Task {0} failure.", failedTask.Id);
+                            break;
+
                         case SystemState.ShuttingDown:
                             _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask);
                             TryRecovery();
@@ -725,6 +737,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         private void DoneAction()
         {
+            Logger.Log(Level.Info, "Shutting down Evaluators!!!");
             ShutDownAllEvaluators();
             Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
             DisposeResources();

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index a37fa3b..b2fe281 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -89,6 +89,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private int _numberOfAppErrors = 0;
 
         /// <summary>
+        /// Indicate if master task is completed running properly
+        /// </summary>
+        private bool _masterTaskCompletedRunning = false;
+
+        /// <summary>
         /// Creates a TaskManager with specified total number of tasks and master task id.
         /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null.
         /// </summary>
@@ -204,6 +209,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
         /// <summary>
         /// This method is called when receiving ICompletedTask event during task running or system shutting down.
+        /// If it is master task and if the master task was running, mark _masterTaskCompletedRunning true. That indicates 
+        /// master task has successfully completed, which means the system has got the result from master task. 
         /// Removes the task from running tasks if it was running
         /// Changes the task state from RunningTask to CompletedTask if the task was running
         /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
@@ -211,6 +218,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
         {
+            if (completedTask.Id.Equals(_masterTaskId))
+            {
+                if (GetTaskInfo(completedTask.Id).TaskState.CurrentState.Equals(TaskState.TaskRunning))
+                {
+                    _masterTaskCompletedRunning = true;
+                }
+            }
             _runningTasks.Remove(completedTask.Id);
             UpdateState(completedTask.Id, TaskStateEvent.CompletedTask);
         }
@@ -303,6 +317,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Returns true if master task has completed and produced result
+        /// </summary>
+        /// <returns></returns>
+        internal bool IsMasterTaskCompletedRunning()
+        {
+            return _masterTaskCompletedRunning;
+        }
+
+        /// <summary>
         /// Checks if all the tasks are running.
         /// </summary>
         /// <returns></returns>
@@ -313,12 +336,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Checks if all the tasks are completed.
+        /// When master task is completed, that means the system has got the result expected 
+        /// regardless of other mapper tasks returned or not. 
         /// </summary>
         /// <returns></returns>
-        internal bool AreAllTasksCompleted()
+        internal bool IsJobDone()
         {
-            return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0;
+            return IsMasterTaskCompletedRunning();
         }
 
         /// <summary>
@@ -528,8 +552,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     }
                     SubmitTask(taskId);
                 }
+            }
         }
-    }
 
         private void SubmitTask(string taskId)
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
index d1841e8..1db3d80 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
@@ -51,7 +51,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var runningTaskCount = GetMessageCount(lines, "Received IRunningTask");
             var failedEvaluatorCount = GetMessageCount(lines, "Received IFailedEvaluator");
             var failedTaskCount = GetMessageCount(lines, "Received IFailedTask");
-            Assert.Equal((NumOfRetry + 1) * NumNodes, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True((NumOfRetry + 1) * NumNodes >= completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True(NumOfRetry * NumNodes < completedTaskCount + failedEvaluatorCount + failedTaskCount);
             Assert.Equal((NumOfRetry + 1) * NumNodes, runningTaskCount);
             CleanUp(testFolder);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index 1898066..cd94f43 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -67,8 +67,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 
             // on each try each task should fail or complete or disappear with failed evaluator
             // and on each try all tasks should start successfully
-            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True((NumberOfRetry + 1) * numTasks >= completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True(NumberOfRetry * numTasks < completedTaskCount + failedEvaluatorCount + failedTaskCount);
             Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);
             CleanUp(testFolder);

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index b94d699..71c14d4 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -64,7 +64,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             // Rest of the tasks should be canceled and send completed task event to the driver. 
             Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
-            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+            Assert.True(((NumberOfRetry + 1) * numTasks) - failedEvaluatorCount >= completedTaskCount);
+            Assert.True((NumberOfRetry * numTasks) - failedEvaluatorCount < completedTaskCount);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
index 1d1176c..d027b8f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
@@ -62,7 +62,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             // No failed evaluators or tasks.
             Assert.Equal(0, failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
-            Assert.Equal(numTasks, completedTaskCount);
+            Assert.True(numTasks >= completedTaskCount);
+            Assert.True(completedTaskCount >= 1);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);

http://git-wip-us.apache.org/repos/asf/reef/blob/765d7f61/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
index cd2d6b3..b695d8a 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
@@ -63,7 +63,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             // Rest of the tasks should be canceled and send completed task event to the driver. 
             Assert.Equal(0, failedEvaluatorCount);
             Assert.Equal(NumberOfRetry * 2, failedTaskCount);
-            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+            Assert.True(((NumberOfRetry + 1) * numTasks) - failedTaskCount >= completedTaskCount);
+            Assert.True((NumberOfRetry * numTasks) - failedTaskCount < completedTaskCount);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);