You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2017/05/03 22:16:40 UTC

reef git commit: [REEF-1778] Ensure ITask.Dispose() is executed

Repository: reef
Updated Branches:
  refs/heads/master 5e7d99be3 -> 2ce19a571


[REEF-1778] Ensure ITask.Dispose() is executed

This moves the reporting of the result of a task *after* the call to `Dispose`
on that task to ensure execution of that method.

To do so, I made the calls to the task start and stop handlers explicit, and
kept track of the exceptions thrown and need to send a result in the
`TaskRuntime`.

Also, this fixes a bug in `TaskRuntimeTests` which did not wait for the
spawned thread to finish before validation of the test results.

JIRA:
  [REEF-1778](https://issues.apache.org/jira/browse/REEF-1778)

Pull Request:
  This closes #1293


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2ce19a57
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2ce19a57
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2ce19a57

Branch: refs/heads/master
Commit: 2ce19a571df020a9f2114d33bb9be92df5cc7ca1
Parents: 5e7d99b
Author: Markus Weimer <we...@apache.org>
Authored: Mon Apr 17 18:50:03 2017 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Wed May 3 15:14:24 2017 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/Task/TaskRuntime.cs       | 51 +++++++++++++++-----
 .../Runtime/Evaluator/Task/TaskStatus.cs        | 31 ++++++++++--
 .../TaskRuntimeTests.cs                         |  2 +-
 3 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 39c51f6..863b30d 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -93,54 +93,81 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
             var taskThread = new Thread(() =>
             {
+                // The result of the task execution.
+                byte[] resultReturnedByTask = null;
+
+                // Whether or not a result shall be returned to the Driver.
+                bool returnResultToDriver = true;
+
+                // Exception thrown during `Dispose`, if any.
+                Exception exceptionThrownByTaskDispose = null;
+
                 try
                 {
+                    // Run the handlers for `TaskStart`
                     Logger.Log(Level.Verbose, "Set running status for task");
+                    _currentStatus.RunTaskStartHandlers();
+
+                    // Update the state
                     _currentStatus.SetRunning();
+
+                    // Call the Task
                     Logger.Log(Level.Verbose, "Calling into user's task.");
-                    var result = _userTask.Call(null);
+                    resultReturnedByTask = _userTask.Call(null);
                     Logger.Log(Level.Info, "Task Call Finished");
-                    _currentStatus.SetResult(result);
 
-                    const Level resultLogLevel = Level.Verbose;
+                    // Run the handlers for `TaskStop`
+                    _currentStatus.RunTaskStopHandlers();
 
-                    if (Logger.IsLoggable(resultLogLevel) && result != null && result.Length > 0)
+                    // Log the result
+                    const Level resultLogLevel = Level.Verbose;
+                    if (Logger.IsLoggable(resultLogLevel) && resultReturnedByTask != null && resultReturnedByTask.Length > 0)
                     {
                         Logger.Log(resultLogLevel,
-                            "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+                            "Task running result:\r\n" + System.Text.Encoding.Default.GetString(resultReturnedByTask));
                     }
                 }
                 catch (TaskStartHandlerException e)
                 {
                     Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException");
                     _currentStatus.SetException(e.InnerException);
+                    returnResultToDriver = false;
                 }
                 catch (TaskStopHandlerException e)
                 {
                     Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException");
                     _currentStatus.SetException(e.InnerException);
+                    returnResultToDriver = false;
                 }
                 catch (Exception e)
                 {
                     Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType());
                     _currentStatus.SetException(e);
+                    returnResultToDriver = false;
                 }
                 finally
                 {
                     try
                     {
-                        if (_userTask != null)
-                        {
-                            _userTask.Dispose();
-                        }
+                        _userTask.Dispose();
                     }
                     catch (Exception e)
                     {
-                        var msg = "Exception during Task Dispose in task Call()";
-                        Logger.Log(Level.Error, msg);
-                        throw new InvalidOperationException(msg, e);
+                        exceptionThrownByTaskDispose = new InvalidOperationException("Exception during Task Dispose in task Call()", e);
                     }
                 }
+
+                // Inform the driver about the result.
+                if (returnResultToDriver)
+                {   
+                    _currentStatus.SetResult(resultReturnedByTask);
+                }
+
+                // If the ITask.Dispose() method threw an Exception, crash the Evaluator.
+                if (exceptionThrownByTaskDispose != null)
+                {
+                    throw exceptionThrownByTaskDispose;
+                }               
             });
 
             taskThread.Start();

http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index a195333..f906ec9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -110,9 +110,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             {
                 _result = Optional<byte[]>.OfNullable(result);
 
-                // This can throw an Exception.
-                _taskLifeCycle.Stop();
-
                 switch (State)
                 {
                     case TaskState.SuspendRequested:
@@ -141,6 +138,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             }
         }
 
+        /// <summary>
+        /// Runs the Task Start Handlers
+        /// </summary>
+        /// <exception cref="TaskStartHandlerException">If any of the Task Start Handlers throws an exception</exception>
+        public void RunTaskStartHandlers()
+        {
+            _taskLifeCycle.Start();
+        }
+
+        /// <summary>
+        /// Runs the Task Stop Handlers
+        /// </summary>
+        /// <exception cref="TaskStopHandlerException">If any of the Task Stop Handlers throws an exception</exception>
+        public void RunTaskStopHandlers()
+        {
+            _taskLifeCycle.Stop();
+        }
+
         public void SetRunning()
         {
             lock (_heartBeatManager)
@@ -148,7 +163,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
                 if (_state == TaskState.Init)
                 {
-                    _taskLifeCycle.Start();
                     State = TaskState.Running;
                     LOGGER.Log(Level.Verbose, "Sending task Running heartbeat");
                     Heartbeat();
@@ -202,6 +216,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             return _state != TaskState.Running;
         }
 
+        /// <summary>
+        /// Check whether the task is in state `Running`
+        /// </summary>
+        /// <returns>true, the task is in state `Running`</returns>
+        public bool IsRunning()
+        {
+            return _state == TaskState.Running;
+        }
+
         public bool HasEnded()
         {
             switch (_state)

http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
index c8078b0..489dcab 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
@@ -79,9 +79,9 @@ namespace Org.Apache.REEF.Evaluator.Tests
             var task = injector.GetInstance<TestTask>();
             task.FinishCountdownEvent.Wait();
             task.DisposeCountdownEvent.Wait();
+            taskThread.Join();
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done);
             Assert.True(taskRuntime.HasEnded());
-            taskThread.Join();
         }
 
         /// <summary>