You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2017/05/03 22:16:40 UTC
reef git commit: [REEF-1778] Ensure ITask.Dispose() is executed
Repository: reef
Updated Branches:
refs/heads/master 5e7d99be3 -> 2ce19a571
[REEF-1778] Ensure ITask.Dispose() is executed
This moves the reporting of the result of a task *after* the call to `Dispose`
on that task to ensure execution of that method.
To do so, I made the calls to the task start and stop handlers explicit, and
kept track of the exceptions thrown and need to send a result in the
`TaskRuntime`.
Also, this fixes a bug in `TaskRuntimeTests` which did not wait for the
spawned thread to finish before validation of the test results.
JIRA:
[REEF-1778](https://issues.apache.org/jira/browse/REEF-1778)
Pull Request:
This closes #1293
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2ce19a57
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2ce19a57
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2ce19a57
Branch: refs/heads/master
Commit: 2ce19a571df020a9f2114d33bb9be92df5cc7ca1
Parents: 5e7d99b
Author: Markus Weimer <we...@apache.org>
Authored: Mon Apr 17 18:50:03 2017 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Wed May 3 15:14:24 2017 -0700
----------------------------------------------------------------------
.../Runtime/Evaluator/Task/TaskRuntime.cs | 51 +++++++++++++++-----
.../Runtime/Evaluator/Task/TaskStatus.cs | 31 ++++++++++--
.../TaskRuntimeTests.cs | 2 +-
3 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 39c51f6..863b30d 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -93,54 +93,81 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
var taskThread = new Thread(() =>
{
+ // The result of the task execution.
+ byte[] resultReturnedByTask = null;
+
+ // Whether or not a result shall be returned to the Driver.
+ bool returnResultToDriver = true;
+
+ // Exception thrown during `Dispose`, if any.
+ Exception exceptionThrownByTaskDispose = null;
+
try
{
+ // Run the handlers for `TaskStart`
Logger.Log(Level.Verbose, "Set running status for task");
+ _currentStatus.RunTaskStartHandlers();
+
+ // Update the state
_currentStatus.SetRunning();
+
+ // Call the Task
Logger.Log(Level.Verbose, "Calling into user's task.");
- var result = _userTask.Call(null);
+ resultReturnedByTask = _userTask.Call(null);
Logger.Log(Level.Info, "Task Call Finished");
- _currentStatus.SetResult(result);
- const Level resultLogLevel = Level.Verbose;
+ // Run the handlers for `TaskStop`
+ _currentStatus.RunTaskStopHandlers();
- if (Logger.IsLoggable(resultLogLevel) && result != null && result.Length > 0)
+ // Log the result
+ const Level resultLogLevel = Level.Verbose;
+ if (Logger.IsLoggable(resultLogLevel) && resultReturnedByTask != null && resultReturnedByTask.Length > 0)
{
Logger.Log(resultLogLevel,
- "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+ "Task running result:\r\n" + System.Text.Encoding.Default.GetString(resultReturnedByTask));
}
}
catch (TaskStartHandlerException e)
{
Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException");
_currentStatus.SetException(e.InnerException);
+ returnResultToDriver = false;
}
catch (TaskStopHandlerException e)
{
Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException");
_currentStatus.SetException(e.InnerException);
+ returnResultToDriver = false;
}
catch (Exception e)
{
Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType());
_currentStatus.SetException(e);
+ returnResultToDriver = false;
}
finally
{
try
{
- if (_userTask != null)
- {
- _userTask.Dispose();
- }
+ _userTask.Dispose();
}
catch (Exception e)
{
- var msg = "Exception during Task Dispose in task Call()";
- Logger.Log(Level.Error, msg);
- throw new InvalidOperationException(msg, e);
+ exceptionThrownByTaskDispose = new InvalidOperationException("Exception during Task Dispose in task Call()", e);
}
}
+
+ // Inform the driver about the result.
+ if (returnResultToDriver)
+ {
+ _currentStatus.SetResult(resultReturnedByTask);
+ }
+
+ // If the ITask.Dispose() method threw an Exception, crash the Evaluator.
+ if (exceptionThrownByTaskDispose != null)
+ {
+ throw exceptionThrownByTaskDispose;
+ }
});
taskThread.Start();
http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index a195333..f906ec9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -110,9 +110,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
{
_result = Optional<byte[]>.OfNullable(result);
- // This can throw an Exception.
- _taskLifeCycle.Stop();
-
switch (State)
{
case TaskState.SuspendRequested:
@@ -141,6 +138,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
}
}
+ /// <summary>
+ /// Runs the Task Start Handlers
+ /// </summary>
+ /// <exception cref="TaskStartHandlerException">If any of the Task Start Handlers throws an exception</exception>
+ public void RunTaskStartHandlers()
+ {
+ _taskLifeCycle.Start();
+ }
+
+ /// <summary>
+ /// Runs the Task Stop Handlers
+ /// </summary>
+ /// <exception cref="TaskStopHandlerException">If any of the Task Stop Handlers throws an exception</exception>
+ public void RunTaskStopHandlers()
+ {
+ _taskLifeCycle.Stop();
+ }
+
public void SetRunning()
{
lock (_heartBeatManager)
@@ -148,7 +163,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
if (_state == TaskState.Init)
{
- _taskLifeCycle.Start();
State = TaskState.Running;
LOGGER.Log(Level.Verbose, "Sending task Running heartbeat");
Heartbeat();
@@ -202,6 +216,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
return _state != TaskState.Running;
}
+ /// <summary>
+ /// Check whether the task is in state `Running`
+ /// </summary>
+ /// <returns>true, the task is in state `Running`</returns>
+ public bool IsRunning()
+ {
+ return _state == TaskState.Running;
+ }
+
public bool HasEnded()
{
switch (_state)
http://git-wip-us.apache.org/repos/asf/reef/blob/2ce19a57/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
index c8078b0..489dcab 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
@@ -79,9 +79,9 @@ namespace Org.Apache.REEF.Evaluator.Tests
var task = injector.GetInstance<TestTask>();
task.FinishCountdownEvent.Wait();
task.DisposeCountdownEvent.Wait();
+ taskThread.Join();
Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done);
Assert.True(taskRuntime.HasEnded());
- taskThread.Join();
}
/// <summary>