You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/20 23:06:59 UTC

reef git commit: [REEF-1492] Properly handle exception in ResultHandler.Dispose()

Repository: reef
Updated Branches:
  refs/heads/master 415bb9eb3 -> abba4735b


[REEF-1492] Properly handle exception in ResultHandler.Dispose()

This change fixes several issues related to ResultHandler.Dispose():

* The sample implementation of ResultHandler uploads local file to remote
  in Dispose method, which is not a good behavior. In recovery scenarios,
  Dispose can be called in each retry with no result written to local file yet.
  Upload should only be done after the result is written to the local file.
* Dispose method should release resources only and not contain complex logic
  to reduce the chance of its failure.
* ResultHandler.Dispose() is called in FinallyBlock() method in TaskHost.
  It should be called in Task.Dispose to share exception handling logic
  with Task.Dispose().

JIRA:
  [REEF-1492](https://issues.apache.org/jira/browse/REEF-1492)

Pull request:
  This closes #1205


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/abba4735
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/abba4735
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/abba4735

Branch: refs/heads/master
Commit: abba4735b8a7d3932e1b9482bce3b06019cd4f24
Parents: 415bb9e
Author: Julia Wang <ju...@apache.org>
Authored: Thu Dec 15 14:36:40 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Tue Dec 20 15:06:01 2016 -0800

----------------------------------------------------------------------
 .../OnREEF/IMRUTasks/TaskHostBase.cs            |   8 -
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  18 +--
 .../OnREEF/ResultHandler/WriteResultHandler.cs  |  23 ++-
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |  64 ++------
 .../IMRU/TestExceptionInResultHandlerDispose.cs | 156 +++++++++++++++++++
 .../Functional/IMRU/TestFailMapperEvaluators.cs |   1 +
 ...valuatorsWithFailedResultHandlerOnDispose.cs | 107 +++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   2 +
 8 files changed, 298 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
index 5d56fd2..718a794 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
@@ -111,7 +111,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             }
             finally
             {
-                FinallyBlock();
                 _taskCloseCoordinator.SignalTaskStopped();
             }
             Logger.Log(Level.Info, "{0} returned with cancellation token:{1}.", TaskHostName, _cancellationSource.IsCancellationRequested);
@@ -135,13 +134,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         protected abstract byte[] TaskBody(byte[] memento);
 
         /// <summary>
-        /// The code that needs to be executed no matter exception happens or not in Call() method.  
-        /// </summary>
-        protected virtual void FinallyBlock()
-        {
-        }
-
-        /// <summary>
         /// Task host name
         /// </summary>
         protected abstract string TaskHostName { get; }

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 5fb1110..0afb8d3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -70,6 +70,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 _communicationGroupClient.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
             _dataReceiver = _communicationGroupClient.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _resultHandler = resultHandler;
+            Logger.Log(Level.Info, "$$$$_resultHandler." + _resultHandler.GetType().AssemblyQualifiedName);
             Logger.Log(Level.Info, "UpdateTaskHost initialized.");
         }
 
@@ -132,22 +133,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         }
 
         /// <summary>
-        /// Dispose resultHandler
-        /// </summary>
-        protected override void FinallyBlock()
-        {
-            try
-            {
-                _resultHandler.Dispose();
-            }
-            catch (Exception e)
-            {
-                Logger.Log(Level.Error, "Exception in dispose result handler.", e);
-                //// TODO throw proper exceptions JIRA REEF-1492
-            }
-        }
-
-        /// <summary>
         /// Return UpdateHostName
         /// </summary>
         protected override string TaskHostName
@@ -159,6 +144,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         {
             if (Interlocked.Exchange(ref _disposed, 1) == 0)
             {
+                _resultHandler.Dispose();
                 _groupCommunicationsClient.Dispose();
                 var disposableTask = _updateTask as IDisposable;
                 if (disposableTask != null)

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
index 3a10e89..8d91a5c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
@@ -40,7 +40,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
         private readonly IStreamingCodec<TResult> _resultCodec;
         private readonly IFileSystem _fileSystem;
         private readonly string _remoteFileName;
-        private readonly string _localFilename;
+        private string _localFilename;
 
         [Inject]
         private WriteResultHandler(
@@ -66,13 +66,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
             }
 
             WriteOutput(value);
+            UploadResultFile();
         }
 
         /// <summary>
-        /// Handles what to do on completion
-        /// In this case write to remote location
+        /// Upload local file to remote using the FileSystem
         /// </summary>
-        public void Dispose()
+        private void UploadResultFile()
         {
             if (string.IsNullOrWhiteSpace(_remoteFileName))
             {
@@ -84,12 +84,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
             if (_fileSystem.Exists(remoteUri))
             {
                 Exceptions.Throw(
-                    new Exception(string.Format("Output Uri: {0} already exists", remoteUri)), Logger);
+                    new Exception(string.Format("Output Uri: {0} already exists", remoteUri)),
+                    Logger);
             }
 
             _fileSystem.CopyFromLocal(_localFilename, remoteUri);
         }
 
+        /// <summary>
+        /// Delete local file
+        /// </summary>
+        public void Dispose()
+        {
+            if (_localFilename != null)
+            {
+                File.Delete(_localFilename);
+                _localFilename = null;
+            }
+        }
+
         private string GenerateLocalFilename()
         {
             string localFileFolder = Path.GetTempPath() + "-partition-" + Guid.NewGuid().ToString("N").Substring(0, 8);

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index dae056f..884193f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -16,7 +16,6 @@
 // under the License.
 
 using System;
-using System.Diagnostics;
 using System.Globalization;
 using System.IO;
 using System.Linq;
@@ -26,6 +25,7 @@ using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
 using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
@@ -59,11 +59,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Abstract method for subclass to override it to provide configurations for driver handlers 
         /// </summary>
-        /// <typeparam name="TMapInput"></typeparam>
-        /// <typeparam name="TMapOutput"></typeparam>
-        /// <typeparam name="TResult"></typeparam>
-        /// <typeparam name="TPartitionType"></typeparam>
-        /// <returns></returns>
         protected abstract IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>();
 
         /// <summary>
@@ -72,16 +67,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// then calls TestRun for running the test.
         /// Subclass can override it if they have different parameters for the test
         /// </summary>
-        /// <param name="runOnYarn"></param>
-        /// <param name="numTasks"></param>
-        /// <param name="chunkSize"></param>
-        /// <param name="dims"></param>
-        /// <param name="iterations"></param>
-        /// <param name="mapperMemory"></param>
-        /// <param name="numberOfRetryInRecovery"></param>
-        /// <param name="updateTaskMemory"></param>
-        /// <param name="testFolder"></param>
-        /// <param name="numberOfChecksBeforeCancellingJob"></param>
         protected void TestBroadCastAndReduce(bool runOnYarn,
             int numTasks,
             int chunkSize,
@@ -107,13 +92,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Build driver configuration
         /// </summary>
-        /// <typeparam name="TMapInput"></typeparam>
-        /// <typeparam name="TMapOutput"></typeparam>
-        /// <typeparam name="TResult"></typeparam>
-        /// <typeparam name="TPartitionType"></typeparam>
-        /// <param name="jobDefinition"></param>
-        /// <param name="driverHandlerConfig"></param>
-        /// <returns></returns>
         protected IConfiguration DriverConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>(
             IMRUJobDefinition jobDefinition,
             IConfiguration driverHandlerConfig)
@@ -179,13 +157,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Create group communication configuration
         /// </summary>
-        /// <typeparam name="TMapInput"></typeparam>
-        /// <typeparam name="TMapOutput"></typeparam>
-        /// <typeparam name="TResult"></typeparam>
-        /// <typeparam name="TPartitionType"></typeparam>
-        /// <param name="numberOfTasks"></param>
-        /// <param name="driverId"></param>
-        /// <returns></returns>
         private IConfiguration CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>(
             int numberOfTasks,
             string driverId)
@@ -203,15 +174,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Create IMRU Job Definition with IMRU required configurations
         /// </summary>
-        /// <param name="numberofMappers"></param>
-        /// <param name="chunkSize"></param>
-        /// <param name="numIterations"></param>
-        /// <param name="dim"></param>
-        /// <param name="mapperMemory"></param>
-        /// <param name="updateTaskMemory"></param>
-        /// <param name="numberOfRetryInRecovery"></param>
-        /// <param name="numberOfChecksBeforeCancellingJob"></param>
-        /// <returns></returns>
         protected virtual IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers,
             int chunkSize,
             int numIterations,
@@ -230,6 +192,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
                 .SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
                 .SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers))
+                .SetResultHandlerConfiguration(BuildResultHandlerConfig())
                 .SetJobName(IMRUJobName)
                 .SetNumberOfMappers(numberofMappers)
                 .SetMapperMemory(mapperMemory)
@@ -251,12 +214,18 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
+        /// Build default result handler configuration. Subclass can override it.
+        /// </summary>
+        protected virtual IConfiguration BuildResultHandlerConfig()
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindImplementation(GenericType<IIMRUResultHandler<int[]>>.Class, GenericType<DefaultResultHandler<int[]>>.Class)
+                    .Build();
+        }
+
+        /// <summary>
         /// Build update function configuration. Subclass can override it.
         /// </summary>
-        /// <param name="numberofMappers"></param>
-        /// <param name="numIterations"></param>
-        /// <param name="dim"></param>
-        /// <returns></returns>
         protected virtual IConfiguration BuildUpdateFunctionConfiguration(int numberofMappers, int numIterations, int dim)
         {
             var updateFunctionConfig =
@@ -274,8 +243,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         ///  Data Converter Configuration. Subclass can override it to have its own test Data Converter.
         /// </summary>
-        /// <param name="chunkSize"></param>
-        /// <returns></returns>
         protected virtual IConfiguration BuildDataConverterConfig(int chunkSize)
         {
             return TangFactory.GetTang()
@@ -290,7 +257,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Mapper function configuration. Subclass can override it to have its own test function.
         /// </summary>
-        /// <returns></returns>
         protected virtual IConfiguration BuildMapperFunctionConfig()
         {
             return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
@@ -302,7 +268,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Set update function to IMRUUpdateConfiguration configuration module. Sub class can override it to set different function.
         /// </summary>
-        /// <returns></returns>
         protected virtual IConfiguration BuildUpdateFunctionConfigModule()
         {
             return IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule
@@ -314,8 +279,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Partition dataset configuration. Subclass can override it to have its own test dataset config
         /// </summary>
-        /// <param name="numberofMappers"></param>
-        /// <returns></returns>
         protected virtual IConfiguration BuildPartitionedDatasetConfiguration(int numberofMappers)
         {
             return RandomInputDataConfiguration.ConfigurationModule.Set(
@@ -326,7 +289,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Map Input Codec configuration. Subclass can override it to have its own test Codec.
         /// </summary>
-        /// <returns></returns>
         protected virtual IConfiguration BuildMapInputCodecConfig()
         {
             return IMRUCodecConfiguration<int[]>.ConfigurationModule
@@ -337,7 +299,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Update function Codec configuration. Subclass can override it to have its own test Codec.
         /// </summary>
-        /// <returns></returns>
         protected virtual IConfiguration BuildUpdateFunctionCodecsConfig()
         {
             return IMRUCodecConfiguration<int[]>.ConfigurationModule
@@ -348,7 +309,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// Reduce function configuration. Subclass can override it to have its own test function.
         /// </summary>
-        /// <returns></returns>
         protected virtual IConfiguration BuildReduceFunctionConfig()
         {
             return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestExceptionInResultHandlerDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestExceptionInResultHandlerDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestExceptionInResultHandlerDispose.cs
new file mode 100644
index 0000000..d6a8564
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestExceptionInResultHandlerDispose.cs
@@ -0,0 +1,156 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Diagnostics;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestExceptionInResultHandlerDispose : IMRUBrodcastReduceTestBase
+    {
+        /// <summary>
+        /// This test is to throw exception in IIMRUResultHandler Dispose() method
+        /// </summary>
+        [Fact]
+        public void TestExceptionInResultHandlerDisposeOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            int numberOfRetryInRecovery = 4;
+
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+            // Master evaluator will fail after master task is completed. Depending on how quick the driver dispose contexts after the master task complete,
+            // driver may or may not receive the IFailedEvalautor event. 
+            Assert.True(failedEvaluatorCount <= 1);
+
+            // Scenario1: Driver receives FailedEvaluator caused by dispose of a completed task after all the tasks have been competed. 
+            //            FailedEvaluator event will be ignored.
+            // Scenario2: Driver receives FailedEvaluator caused by dispose of master task before all the tasks have been competed.
+            //            Driver will send close event to the rest of the running tasks and enter shutdown state 
+            //            During this process, some tasks can still complete and some may fail due to communication error
+            //            As evaluator failure happens in finally block, therefore either ICompletedTask or IFailedTask event should be sent before it.
+            //            Considering once maser is done, rest of the contexts will be disposed, we have 
+            //            numTasks >= completedTask# + FailedTask#
+            Assert.True(numTasks >= completedTaskCount + failedTaskCount);
+
+            // As update task completion happens before update evaluator failure caused by dispose, eventually job should succeeds
+            Assert.Equal(1, jobSuccess);
+
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public void TestExceptionInResultHandlerDisposerOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            int numberOfRetryInRecovery = 4;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver.
+        /// </summary>
+        protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Bind ResultHandlerWithException as IIMRUResultHandler
+        /// </summary>
+        protected override IConfiguration BuildResultHandlerConfig()
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindImplementation(GenericType<IIMRUResultHandler<int[]>>.Class, GenericType<ResultHandlerWithException<int[]>>.Class)
+                    .Build();
+        }
+
+        /// <summary>
+        /// An implementation of IIMRUResultHandler that throws exception in Dispose()
+        /// </summary>
+        internal sealed class ResultHandlerWithException<TResult> : IIMRUResultHandler<TResult>
+        {
+            [Inject]
+            private ResultHandlerWithException()
+            {
+            }
+
+            /// <summary>
+            /// Specifies how to handle the IMRU results from UpdateTask. Does nothing
+            /// </summary>
+            /// <param name="result">The result of IMRU</param>
+            public void HandleResult(TResult result)
+            {
+            }
+
+            /// <summary>
+            /// Simulate exception 
+            /// </summary>
+            public void Dispose()
+            {
+                Logger.Log(Level.Warning, "Simulate exception in ResultHandlerWithException.Dispose.");
+                throw new Exception("Exception from ResultHandlerWithException.Dispose()");
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index cd94f43..df99e0b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -158,6 +158,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
                 .SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
                 .SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers))
+                .SetResultHandlerConfiguration(BuildResultHandlerConfig())
                 .SetJobName(IMRUJobName)
                 .SetNumberOfMappers(numberofMappers)
                 .SetMapperMemory(mapperMemory)

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose.cs
new file mode 100644
index 0000000..1ce7686
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose.cs
@@ -0,0 +1,107 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test fails two mappers during the iterations. When driver is to close master task, 
+        /// the ResultHandler in the Update task will throw exception in Dispose.
+        /// This Dispose can be called either when the task is returned from Call() by cancellation token, then TaskRuntime calls Dispose
+        /// or by the finally block in TaskRuntime.Close() method, depending on which one is quicker. 
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+
+            // As the driver will shut down as soon as all tasks are in final state. The task state is final either by 
+            // ICompletedTask or IFailedEvaluator. But MessageLogger may not be able to receive the last event 
+            // before driver shut down. 
+            var failedEvaluatorCount = GetMessageCount(lines, "Received IFailedEvaluator");
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+            // All tasks should start running before fail
+            Assert.Equal(numTasks, runningTaskCount);
+
+            // Tasks should fail or complete or disappear with failed evaluator
+            Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+
+            // We have failed two mappers and one update evaluator in the test code. As the update evaluator failure 
+            // happens in Dispose(), driver may/may not receive FailedEvaluator before shut down.
+            Assert.True(failedEvaluatorCount <= 3 && failedEvaluatorCount >= 2);
+
+            // eventually job fail because master evaluator fail before the iteration is completed
+            Assert.Equal(0, jobSuccess);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public override void TestFailedMapperOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// Bind TestExceptionInResultHandlerDispose as IIMRUResultHandler
+        /// </summary>
+        protected override IConfiguration BuildResultHandlerConfig()
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindImplementation(GenericType<IIMRUResultHandler<int[]>>.Class, GenericType<TestExceptionInResultHandlerDispose.ResultHandlerWithException<int[]>>.Class)
+                    .Build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/abba4735/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index d483c7f..c468584 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -130,6 +130,8 @@ under the License.
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile Include="Functional\IMRU\TestExceptionInResultHandlerDispose.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsWithFailedResultHandlerOnDispose.cs" />
     <Compile Include="Functional\IMRU\TestFailUpdateEvaluatorOnWaitingForEvaluator.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperTasksOnDispose.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnDispose.cs" />