You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/07/01 21:26:20 UTC

reef git commit: [REEF-1447] Validate Task close failure => FailedTask Event (task has not yet finished)

Repository: reef
Updated Branches:
  refs/heads/master 641cb59c0 -> 4ab6a8d1b


[REEF-1447] Validate Task close failure => FailedTask Event (task has not yet finished)

This addressed the issue by
  * Failing the Evaluator on an `Exception` in `TaskCloseHandler`.
  * Adding a test to test for close event failure while Task is still running. Validating that the `TaskCloseHandler` `Exception` triggers a `FailedEvaluator`.
  * Modify `TaskRuntime` and related tests to verify that the `TaskStopHandler` is not called upon `Task` failure.

JIRA:
  [REEF-1447](https://issues.apache.org/jira/browse/REEF-1447)

This closes #1042


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4ab6a8d1
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4ab6a8d1
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4ab6a8d1

Branch: refs/heads/master
Commit: 4ab6a8d1beb4016848174bbecf82782e7145a35f
Parents: 641cb59
Author: Andrew Chung <af...@gmail.com>
Authored: Thu Jun 9 16:59:36 2016 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Fri Jul 1 14:24:36 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  11 +-
 .../Runtime/Evaluator/Task/TaskStatus.cs        |  26 +--
 .../TaskRuntimeTests.cs                         |   7 +-
 .../Functional/Bridge/TestCloseTask.cs          | 103 ++---------
 .../Functional/Bridge/TestDisposeTasks.cs       |  26 ++-
 .../Functional/Common/EventHandle.cs            |  59 +++++++
 .../Task/Handlers/ExceptionThrowingHandler.cs   |  19 +-
 .../Failure/User/TaskCloseExceptionTest.cs      | 173 +++++++++++++++++++
 .../FaultTolerant/TestResubmitTask.cs           |  26 ++-
 .../Org.Apache.REEF.Tests.csproj                |   3 +-
 10 files changed, 302 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index a6480e8..2feccd2 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -41,6 +41,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         private readonly IInjectionFuture<IObserver<ISuspendEvent>> _suspendHandlerFuture;
         private readonly IInjectionFuture<IObserver<ICloseEvent>> _closeHandlerFuture;
         private int _taskRan = 0;
+        private int _taskClosed = 0;
 
         [Inject]
         private TaskRuntime(
@@ -165,6 +166,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         public void Close(byte[] message)
         {
             Logger.Log(Level.Info, "Trying to close Task {0}", TaskId);
+            if (Interlocked.Exchange(ref _taskClosed, 1) != 0)
+            {
+                // Return if we have already called close. This can happen when TaskCloseHandler
+                // is invoked and throws an Exception before the Task is completed. The control flows
+                // to failing the Evaluator, which eventually tries to close the Task again on Dispose.
+                return;
+            }
 
             if (_currentStatus.IsNotRunning())
             {
@@ -178,8 +186,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             }
             catch (Exception e)
             {
-                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger);
-                _currentStatus.SetException(e);
+                Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Error during Close.", Logger);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index 5e4ca2f..a195333 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -98,20 +98,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         {
             lock (_heartBeatManager)
             {
-                try
-                {
-                    if (!_lastException.IsPresent())
-                    {
-                        _lastException = Optional<Exception>.Of(e);
-                    }
-
-                    State = TaskState.Failed;
-                    _taskLifeCycle.Stop();
-                }
-                finally
-                {
-                    Heartbeat();
-                }
+                _lastException = Optional<Exception>.Of(e);
+                State = TaskState.Failed;
+                Heartbeat();
             }
         }
 
@@ -120,6 +109,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             lock (_heartBeatManager)
             {
                 _result = Optional<byte[]>.OfNullable(result);
+
+                // This can throw an Exception.
                 _taskLifeCycle.Stop();
 
                 switch (State)
@@ -132,6 +123,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                         State = TaskState.Done;
                         break;
                 }
+                
                 Heartbeat();
             }
         }
@@ -354,9 +346,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         {
             if (_result.IsPresent() && _lastException.IsPresent())
             {
-                LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArraysToString(_result.Value));
-                State = TaskState.Failed;
-                _result = Optional<byte[]>.Empty();
+                throw new ApplicationException(
+                    string.Format("Both Exception and Result are present. One of the Threads have already sent a result back." +
+                    "Result returned [{0}]. Exception was [{1}]. Failing the Evaluator.", _result.Value, _lastException.Value));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
index 5e673e5..c8078b0 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs
@@ -206,7 +206,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
 
         /// <summary>
         /// Tests whether task start and stop handlers are properly instantiated and invoked
-        /// on the failure of a task.
+        /// on the failure of a task. On failure, TaskStop handler should not be invoked.
         /// </summary>
         [Fact]
         public void TestTaskEventsOnFailure()
@@ -232,8 +232,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 throw new Exception("Event handler is not expected to be null.");
             }
 
-            Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent());
-            Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId);
+            Assert.False(testTaskEventStopHandler.StopInvoked.IsPresent());
 
             taskThread.Join();
         }
@@ -337,7 +336,7 @@ namespace Org.Apache.REEF.Evaluator.Tests
                 throw new Exception("Event handler is not expected to be null.");
             }
 
-            Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent());
+            Assert.False(testTaskEventStopHandler.StopInvoked.IsPresent());
             Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed);
 
             taskRuntime.Suspend(null);

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index a4b28cc..d50489c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -98,7 +98,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
             TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, "TestEnforceCloseTask", "local", testFolder);
-            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
             ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 0);
             var messages = new List<string>();
             messages.Add(DisposeMessageFromDriver);
@@ -108,29 +108,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
-        /// This test is to close a running task with exception throw in close handler
-        /// Expect to receive Exception in Failed Task event handler in driver
-        /// </summary>
-        [Fact]
-        public void TestStopTaskWithExceptionOnLocalRuntime()
-        {
-            const string successIndication = "EXIT: ActiveContextClr2Java::Close";
-            const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
-
-            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForFailToCloseTask()), typeof(CloseTaskTestDriver), 1, "testStopTaskWithException", "local", testFolder);
-            var messages = new List<string>();
-            messages.Add(successIndication);
-            messages.Add(failedTaskIndication);
-            ValidateMessageSuccessfullyLogged(messages, "driver", DriverStdout, testFolder, 1);
-
-            var messages1 = new List<string>();
-            messages1.Add(DisposeMessageFromDriver);
-            ValidateMessageSuccessfullyLogged(messages1, "Node-*", EvaluatorStdout, testFolder, 2);
-            CleanUp(testFolder);
-        }
-
-        /// <summary>
         /// This test is to close a running task over the bridge without close handler bound
         /// Expect to get TaskCloseHandlerNotBoundException
         /// </summary>
@@ -143,7 +120,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForNoCloseHandlerTask()), typeof(CloseTaskTestDriver), 1, "testStopTaskWithNoCloseHandler", "local", testFolder);
             var messages = new List<string>();
             messages.Add(closeHandlerNoBound);
-            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1);
             CleanUp(testFolder);
         }
 
@@ -195,15 +172,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Build();
         }
 
-        private IConfiguration GetTaskConfigurationForFailToCloseTask()
-        {
-            return TaskConfiguration.ConfigurationModule
-                .Set(TaskConfiguration.Identifier, "TaskID-FailToClose")
-                .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class)
-                .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class)
-                .Build();
-        }
-
         private IConfiguration GetTaskConfigurationForNoCloseHandlerTask()
         {
             return TaskConfiguration.ConfigurationModule
@@ -225,6 +193,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskFailed, GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<CloseTaskTestDriver>.Class)
                 .Build();
 
             AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
@@ -252,6 +221,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             IObserver<IActiveContext>,
             IObserver<ICompletedTask>,
             IObserver<IFailedTask>,
+            IObserver<IFailedEvaluator>,
             IObserver<IRunningTask>           
         {
             private readonly IEvaluatorRequestor _requestor;
@@ -295,6 +265,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 value.ActiveContext.Dispose();
             }
 
+            public void OnNext(IFailedEvaluator value)
+            {
+                Assert.True(value.FailedTask.IsPresent());
+                Assert.Equal(value.FailedTask.Value.Id, "TaskID-EnforceToClose");
+                Assert.Contains(TaskManager.TaskKilledByDriver, value.EvaluatorException.InnerException.Message);
+            }
+
             public void OnNext(IFailedTask value)
             {
                 var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value);
@@ -308,11 +285,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 {
                     Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, failedExeption);
                 }
-                if (value.Id.EndsWith("TaskID-EnforceToClose"))
-                {
-                    Assert.Contains(TaskManager.TaskKilledByDriver, failedExeption);
-                }
-
+                
                 value.GetActiveContext().Value.Dispose();
             }
 
@@ -499,58 +472,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
-        /// This is a test task for the scenario in which the task receives close event, instead of
-        /// let the task to return properly, it throws exception.
-        /// </summary>
-        private sealed class CloseByThrowExceptionTask : ITask, IObserver<ICloseEvent>
-        {
-            private readonly CountdownEvent _suspendSignal = new CountdownEvent(1);
-
-            [Inject]
-            private CloseByThrowExceptionTask()
-            {
-            }
-
-            public byte[] Call(byte[] memento)
-            {
-                Logger.Log(Level.Info, "Hello in FailtToCloseTask");
-                _suspendSignal.Wait();
-                return null;
-            }
-
-            public void Dispose()
-            {
-                Logger.Log(Level.Info, "Task is disposed.");
-            }
-
-            public void OnNext(ICloseEvent value)
-            {
-                try
-                {
-                    if (value.Value != null && value.Value.Value != null)
-                    {
-                        Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value));
-                        Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), DisposeMessageFromDriver);
-                    }
-                }
-                finally
-                {
-                    throw new Exception(FailToCloseTaskMessage);
-                }
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-
-            public void OnError(Exception error)
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        /// <summary>
         /// This task doesn't implement close handler. It is to test closeHandlerNoBound exception.
         /// </summary>
         private sealed class MissingCloseHandlerTask : ITask

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
index 7e8702c..f7ebd2c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
@@ -20,7 +20,6 @@ using System.Collections.Generic;
 using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Context;
-using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Driver;
@@ -32,12 +31,13 @@ using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions;
 using Org.Apache.REEF.Utilities.Logging;
 using Xunit;
 
 namespace Org.Apache.REEF.Tests.Functional.Bridge
 {
+    [Collection("FunctionalTests")]
     public class TestDisposeTasks : ReefFunctionalTest
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(TestDisposeTasks));
@@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             string testFolder = DefaultRuntimeFolder + TestId;
             TestRun(DriverConfigurations(2), typeof(TestDisposeTasks), 1, "TestDisposeTasks", "local", testFolder);
-            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
             var messages = new List<string> { TaskIsDisposed };
             ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
             CleanUp(testFolder);
@@ -109,7 +109,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Set(DriverConfiguration.OnContextActive, GenericType<DisposeTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskRunning, GenericType<DisposeTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskCompleted, GenericType<DisposeTaskTestDriver>.Class)
-                .Set(DriverConfiguration.OnTaskFailed, GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<DisposeTaskTestDriver>.Class)
                 .Build();
 
             return Configurations.Merge(taskIdConfig, driverConfig);
@@ -123,7 +123,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             IObserver<IAllocatedEvaluator>,
             IObserver<IActiveContext>,
             IObserver<ICompletedTask>,
-            IObserver<IFailedTask>,
+            IObserver<IFailedEvaluator>,
             IObserver<IRunningTask>            
         {
             private readonly IEvaluatorRequestor _requestor;            
@@ -167,20 +167,16 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             /// And verify the context associated with the failed task is the same as the context that the task was submitted
             /// </summary>
             /// <param name="value"></param>
-            public void OnNext(IFailedTask value)
+            public void OnNext(IFailedEvaluator value)
             {
-                Assert.Equal(TaskId + "2", value.Id);
+                Assert.True(value.FailedTask.IsPresent());
+                Assert.Equal(TaskId + "2", value.FailedTask.Value.Id);
 
-                var failedException = ByteUtilities.ByteArraysToString(value.Data.Value);
-                var e = value.AsError();
+                var e = value.EvaluatorException.InnerException;
                 Logger.Log(Level.Error, "In IFailedTask: e.type: {0}, e.message: {1}.", e.GetType(), e.Message);
-                Logger.Log(Level.Error, "In IFailedTask: value.Data.Value: {0}, value.Message {1}.", failedException, value.Message);
 
-                Assert.Equal(typeof(Exception), e.GetType());
+                Assert.Equal(typeof(TestSerializableException), e.GetType());
                 Assert.Equal(TaskKilledByDriver, e.Message);
-                Assert.Contains(TaskKilledByDriver, failedException);
-
-                value.GetActiveContext().Value.Dispose();
             }
 
             /// <summary>
@@ -272,7 +268,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                         }
                         else if (msg.Equals(ExitByException))
                         {
-                            throw new Exception(TaskKilledByDriver);
+                            throw new TestSerializableException(TaskKilledByDriver);
                         }
                     }
                     else

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs
new file mode 100644
index 0000000..b30302a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs
@@ -0,0 +1,59 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Tests.Functional.Common
+{
+    /// <summary>
+    /// An test EventHandle that simply wraps around a <see cref="ManualResetEventSlim"/>.
+    /// </summary>
+    public sealed class EventHandle
+    {
+        private readonly ManualResetEventSlim _eventHandle = new ManualResetEventSlim();
+
+        [Inject]
+        private EventHandle()
+        {
+        }
+
+        /// <summary>
+        /// Sets the Event.
+        /// </summary>
+        public void Signal()
+        {
+            _eventHandle.Set();
+        }
+
+        /// <summary>
+        /// Waits for the event signal.
+        /// </summary>
+        public void Wait()
+        {
+            _eventHandle.Wait();
+        }
+
+        /// <summary>
+        /// Resets the event signal.
+        /// </summary>
+        public void Reset()
+        {
+            _eventHandle.Reset();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
index 6e66dac..b24f26e 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
@@ -26,23 +26,28 @@ namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers
     internal abstract class ExceptionThrowingHandler<T> : IObserver<T>
     {
         private readonly Exception _exceptionToThrow;
-        private readonly Action<T> _action;
+        private readonly Action<T> _finallyAction;
 
         protected ExceptionThrowingHandler(
-            Exception exceptionToThrow, Action<T> action = null)
+            Exception exceptionToThrow, Action<T> finallyAction = null)
         {
             _exceptionToThrow = exceptionToThrow;
-            _action = action;
+            _finallyAction = finallyAction;
         }
 
         public void OnNext(T value)
         {
-            if (_action != null)
+            try
             {
-                _action(value);
+                throw _exceptionToThrow;
+            }
+            finally
+            {
+                if (_finallyAction != null)
+                {
+                    _finallyAction(value);
+                }
             }
-
-            throw _exceptionToThrow;
         }
 
         public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs
new file mode 100644
index 0000000..8571531
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs
@@ -0,0 +1,173 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions;
+using Org.Apache.REEF.Tests.Functional.Common;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    /// <summary>
+    /// This test class contains a test that validates that an Exception in the 
+    /// TaskCloseHandler causes a FailedTask event in the Driver.
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public sealed class TaskCloseExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(TaskCloseExceptionTest));
+
+        private const string TaskCloseExceptionMessage = "TaskCloseExceptionMessage";
+        private const string FailedEvaluatorReceived = "FailedEvaluatorReceived";
+
+        /// <summary>
+        /// This test validates that an Exception in the TaskCloseHandler causes a FailedTask
+        /// event in the Driver, and that a new Task can be submitted on the original Context.
+        /// </summary>
+        [Fact]
+        public void TestCloseTaskWithExceptionOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskCloseExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskCloseExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TaskCloseExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, GenericType<TaskCloseExceptionTestDriver>.Class)
+                .Build(), typeof(TaskCloseExceptionTestDriver), 1, "testCloseTaskWithExceptionOnLocalRuntime", "local", testFolder);
+
+            var driverMessages = new List<string>
+            {
+                FailedEvaluatorReceived
+            };
+
+            ValidateSuccessForLocalRuntime(numberOfContextsToClose: 0, numberOfEvaluatorsToFail: 1, testFolder: testFolder);
+            ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1);
+            ValidateMessageSuccessfullyLogged(driverMessages, "driver", DriverStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        private sealed class TaskCloseExceptionTestDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IRunningTask>,
+            IObserver<IFailedEvaluator>
+        {
+            private static readonly string TaskId = "TaskId";
+
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TaskCloseExceptionTestDriver(IEvaluatorRequestor requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                // submit the first Task.
+                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, TaskId)
+                        .Set(TaskConfiguration.Task, GenericType<TaskCloseExceptionTask>.Class)
+                        .Set(TaskConfiguration.OnClose, GenericType<TaskCloseHandlerWithException>.Class)
+                        .Build());
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                if (value.Id == TaskId)
+                {
+                    value.Dispose();
+                }
+            }
+
+            public void OnNext(IFailedEvaluator value)
+            {
+                Assert.True(value.FailedTask.IsPresent());
+                var failedTask = value.FailedTask.Value;
+
+                Assert.Equal(TaskId, failedTask.Id);
+
+                // Check that Exceptions are deserialized correctly.
+                var ex = value.EvaluatorException.InnerException;
+                if (ex == null)
+                {
+                    throw new Exception("Exception was not expected to be null.");
+                }
+
+                var taskCloseEx = ex as TestSerializableException;
+
+                if (taskCloseEx == null)
+                {
+                    throw new Exception("Expected Exception to be of type TaskCloseExceptionTestException, but instead got type " + ex.GetType().Name);
+                }
+
+                if (taskCloseEx.Message != TaskCloseExceptionMessage)
+                {
+                    throw new Exception(
+                        "Expected message to be " + TaskCloseExceptionMessage + " but instead got " + taskCloseEx.Message + ".");
+                }
+
+                Logger.Log(Level.Info, FailedEvaluatorReceived);
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class TaskCloseExceptionTask : WaitingTask
+        {
+            [Inject]
+            private TaskCloseExceptionTask(EventMonitor monitor) : base(monitor)
+            {
+            }
+        }
+
+        private sealed class TaskCloseHandlerWithException : ExceptionThrowingHandler<ICloseEvent>
+        {
+            [Inject]
+            private TaskCloseHandlerWithException(EventMonitor monitor) : base(
+                new TestSerializableException(TaskCloseExceptionMessage),
+                close => { monitor.Signal(); })
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs
index 8e2962f..675dc5d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Context;
@@ -74,7 +75,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
         {
             string testFolder = DefaultRuntimeFolder + TestId;
             TestRun(DriverConfigurations(), typeof(ResubmitTaskTestDriver), 2, "TestResubimitTask", "local", testFolder);
-            ValidateSuccessForLocalRuntime(2, 2, 0, testFolder);
+            ValidateSuccessForLocalRuntime(1, 0, 1, testFolder);
             CleanUp(testFolder);
         }
 
@@ -90,7 +91,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
                 .Set(DriverConfiguration.OnContextActive, GenericType<ResubmitTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitTaskTestDriver>.Class)
                 .Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitTaskTestDriver>.Class)
-                .Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<ResubmitTaskTestDriver>.Class)
                 .Build();
         }
 
@@ -102,7 +103,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
             IObserver<IAllocatedEvaluator>,
             IObserver<IActiveContext>,
             IObserver<ICompletedTask>,
-            IObserver<IFailedTask>,
+            IObserver<IFailedEvaluator>,
             IObserver<IRunningTask>
         {
             private readonly IEvaluatorRequestor _requestor;
@@ -154,19 +155,17 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
             }
 
             /// <summary>
-            /// Verify when exception is shown in task, IFailedTask will be received here with the message set in the task
-            /// And verify the context associated with the failed task is the same as the context that the task was submitted
+            /// Verify when exception is shown in TaskCloseHandler, IFailedEvaluator will be received here with the message set in the task
             /// </summary>
-            /// <param name="value"></param>
-            public void OnNext(IFailedTask value)
+            public void OnNext(IFailedEvaluator value)
             {
-                var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value);
-                Logger.Log(Level.Error, "In IFailedTask: " + failedExeption);
+                Assert.True(value.FailedTask.IsPresent());
+                var failedExeption = value.EvaluatorException.InnerException;
+                Assert.Contains(TaskKilledByDriver, failedExeption.Message);
 
-                VerifyContextTaskMapping(value.Id, value.GetActiveContext().Value.Id);
-                Assert.Contains(TaskKilledByDriver, failedExeption);
+                Logger.Log(Level.Error, "In IFailedEvaluator: " + failedExeption);
 
-                OnNext(value.GetActiveContext().Value);
+                VerifyContextTaskMapping(value.FailedTask.Value.Id, value.FailedContexts.Single().Id);
             }
 
             private void VerifyContextTaskMapping(string taskId, string contextId)
@@ -189,11 +188,10 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
                 switch (value.Id)
                 {
                     case TaskId + "1":
-                    case TaskId + "2":
                         value.Dispose(Encoding.UTF8.GetBytes(KillTaskCommandFromDriver));
                         break;
+                    case TaskId + "2":
                     case TaskId + "3":
-                    case TaskId + "4":
                         value.Send(Encoding.UTF8.GetBytes(CompleteTaskCommandFromDriver));
                         break;
                     default: 

http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index aed5d5c..f56b2bb 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -84,7 +84,6 @@ under the License.
     <Compile Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ContextStartExceptionTest.cs" />
-    <Compile Include="Functional\Common\Task\WaitingTask.cs" />
     <Compile Include="Functional\Failure\User\ReceiveTaskMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" />
     <Compile Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" />
@@ -97,6 +96,7 @@ under the License.
     <Compile Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" />
     <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" />
+    <Compile Include="Functional\Common\Task\WaitingTask.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
     <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" />
     <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithRunningTaskDriver.cs" />
@@ -107,6 +107,7 @@ under the License.
     <Compile Include="Functional\Failure\TestEvaluatorWithActiveContextImmediatePoison.cs" />
     <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" />
     <Compile Include="Functional\Failure\SleepTask.cs" />
+    <Compile Include="Functional\Failure\User\TaskCloseExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\TaskConstructorExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\TaskStopExceptionTest.cs" />
     <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />