You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/09/28 00:07:35 UTC

reef git commit: [REEF-1451] Clean up IMRU Fault Tolerant scenario tests

Repository: reef
Updated Branches:
  refs/heads/master 1add8399e -> 3ea89be97


[REEF-1451] Clean up IMRU Fault Tolerant scenario tests

This change:
 * extends failures simulated with FailureType to include dispose.
 * adds tests for failure of mapper task on init and on dispose
   and failure of mapper evaluator on dispose.
 * cleans up checks and comments in other IMRU FT scenario tests.

JIRA:
  [REEF-1451](https://issues.apache.org/jira/browse/REEF-1451)

Pull request:
  This closes #1132


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3ea89be9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3ea89be9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3ea89be9

Branch: refs/heads/master
Commit: 3ea89be97c20a440bba84c8c33338ee8e480875d
Parents: 1add839
Author: Mariia Mykhailova <ma...@apache.org>
Authored: Mon Sep 26 11:53:29 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Sep 27 17:06:11 2016 -0700

----------------------------------------------------------------------
 .../FaultTolerantPipelinedBroadcastAndReduce.cs | 24 +++--
 .../OnREEF/Driver/IMRUDriver.cs                 |  9 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs |  7 +-
 .../IMRU/TestFailMapperEvaluatorsOnDispose.cs   | 91 +++++++++++++++++++
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      | 17 ++--
 .../Functional/IMRU/TestFailMapperTasks.cs      | 11 ++-
 .../IMRU/TestFailMapperTasksOnDispose.cs        | 90 +++++++++++++++++++
 .../IMRU/TestFailMapperTasksOnInit.cs           | 95 ++++++++++++++++++++
 .../Functional/IMRU/TestFailUpdateEvaluator.cs  |  5 ++
 .../Org.Apache.REEF.Tests.csproj                |  3 +
 10 files changed, 330 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
index f0e563d..1b7b15f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
@@ -87,15 +87,18 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         [NamedParameter(Documentation = "Type of failure to simulate")]
         internal class FailureType : Name<int>
         {
-            internal static readonly int EvaluatorFailureDuringTaskExecution = 0;
-            internal static readonly int TaskFailureDuringTaskExecution = 1;
-            internal static readonly int EvaluatorFailureDuringTaskInitialization = 2;
-            internal static readonly int TaskFailureDuringTaskInitialization = 3;
+            internal const int EvaluatorFailureDuringTaskExecution = 0;
+            internal const int TaskFailureDuringTaskExecution = 1;
+            internal const int EvaluatorFailureDuringTaskInitialization = 2;
+            internal const int TaskFailureDuringTaskInitialization = 3;
+            internal const int EvaluatorFailureDuringTaskDispose = 4;
+            internal const int TaskFailureDuringTaskDispose = 5;
 
             internal static bool IsEvaluatorFailure(int failureType)
             {
                 return failureType == EvaluatorFailureDuringTaskExecution ||
-                       failureType == EvaluatorFailureDuringTaskInitialization;
+                       failureType == EvaluatorFailureDuringTaskInitialization ||
+                       failureType == EvaluatorFailureDuringTaskDispose;
             }
         }
 
@@ -107,7 +110,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// The function is to simulate Evaluator/Task failure for mapper evaluator
         /// </summary>
-        internal sealed class TestSenderMapFunction : IMapFunction<int[], int[]>
+        internal sealed class TestSenderMapFunction : IMapFunction<int[], int[]>, IDisposable
         {
             private int _iterations;
             private readonly string _taskId;
@@ -180,6 +183,15 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                 return mapInput;
             }
 
+            public void Dispose()
+            {
+                if (_failureType == FailureType.EvaluatorFailureDuringTaskDispose ||
+                    _failureType == FailureType.TaskFailureDuringTaskDispose)
+                {
+                    SimulateFailure(_iterations);
+                }
+            }
+
             private void SimulateFailure(int onIteration)
             {
                 if (_iterations == onIteration &&

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index dafba71..50424c7 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -71,6 +71,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private static readonly Logger Logger =
             Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
 
+        internal const string DoneActionPrefix = "DoneAction:";
+        internal const string FailActionPrefix = "FailAction:";
+
         private readonly ConfigurationManager _configurationManager;
         private readonly int _totalMappers;
         private readonly IGroupCommDriver _groupCommDriver;
@@ -699,7 +702,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private void DoneAction()
         {
             ShutDownAllEvaluators();
-            Logger.Log(Level.Info, "DoneAction done in retry {0}!!!", _numberOfRetries);
+            Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
         }
 
         /// <summary>
@@ -709,8 +712,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             ShutDownAllEvaluators();
             var msg = string.Format(CultureInfo.InvariantCulture,
-                "The system cannot be recovered after {0} retries. NumberofFailedMappers in the last try is {1}.",
-                _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
+                "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}.",
+                FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
             Exceptions.Throw(new ApplicationException(msg), Logger);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index a172908..5e53f5d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -38,8 +38,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         protected const int NumberOfRetry = 3;
 
         /// <summary>
-        /// This test is to fail one evaluator and then try to resubmit. In the last retry, 
-        /// there will be no failed evaluator and all tasks will be successfully completed. 
+        /// This test fails two evaluators during task execution stage on each retry except last. 
+        /// Job is retried until success. 
         /// </summary>
         [Fact]
         public virtual void TestFailedMapperOnLocalRuntime()
@@ -65,11 +65,14 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
             var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
 
             // on each try each task should fail or complete or disappear with failed evaluator
             // and on each try all tasks should start successfully
             Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
             Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            // eventually job succeeds
+            Assert.Equal(1, jobSuccess);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
new file mode 100644
index 0000000..c3e6657
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
@@ -0,0 +1,91 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public sealed class TestFailMapperEvaluatorsOnDispose : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test fails two evaluators during task dispose stage. 
+        /// The failures are ignored, because tasks are already completed successfully.
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+            // In first retry, all tasks are completed and then there are 2 failed evaluators. 
+            // No failed tasks.
+            Assert.Equal(2, failedEvaluatorCount);
+            Assert.Equal(0, failedTaskCount);
+            Assert.Equal(numTasks, completedTaskCount);
+
+            // eventually job succeeds
+            Assert.Equal(1, jobSuccess);
+            CleanUp(testFolder);
+        }
+
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskDispose.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index 2dfd593..5827e80 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -17,6 +17,7 @@
 
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
 using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
@@ -29,11 +30,11 @@ using Xunit;
 namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     [Collection("FunctionalTests")]
-    public class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
+    public sealed class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
     {
         /// <summary>
-        /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
-        /// and no retries will be done. 
+        /// This test fails two evaluators during task initialize stage on each retry except last. 
+        /// Job is retried until success.
         /// </summary>
         [Fact]
         public override void TestFailedMapperOnLocalRuntime()
@@ -58,20 +59,20 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
             var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
 
             // In each retry, there are 2 failed evaluators.
-            // The running tasks should receive cancellation and return properly. There will be no failed task.
+            // There will be no failed task.
             // Rest of the tasks should be canceled and send completed task event to the driver. 
             Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
             Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+
+            // eventually job succeeds
+            Assert.Equal(1, jobSuccess);
             CleanUp(testFolder);
         }
 
-        /// <summary>
-        /// Mapper function configuration. Subclass can override it to have its own test function.
-        /// </summary>
-        /// <returns></returns>
         protected override IConfiguration BuildMapperFunctionConfig()
         {
             var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
index 7fec0b7..5915c52 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -17,6 +17,7 @@
 
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
 using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
 using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
@@ -29,11 +30,11 @@ using Xunit;
 namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     [Collection("FunctionalTests")]
-    public class TestFailMapperTasks : TestFailMapperEvaluators
+    public sealed class TestFailMapperTasks : TestFailMapperEvaluators
     {
         /// <summary>
-        /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
-        /// and no retries will be done. 
+        /// This test throws exception in two tasks during task execution stage. 
+        /// This is classified as task app failure, so no retries are done, and job fails.
         /// </summary>
         [Fact]
         public override void TestFailedMapperOnLocalRuntime()
@@ -59,6 +60,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
             var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.FailActionPrefix);
 
             // each task should fail or complete
             // there should be no failed evaluators
@@ -67,6 +69,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             Assert.Equal(numTasks, completedTaskCount + failedTaskCount);
             Assert.Equal(0, failedEvaluatorCount);
             Assert.Equal(numTasks, runningTaskCount);
+            
+            // job fails
+            Assert.True(jobFailure > 0);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
new file mode 100644
index 0000000..e5956a7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
@@ -0,0 +1,90 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public sealed class TestFailMapperTasksOnDispose : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test fails two tasks during task dispose stage. 
+        /// The failures are ignored on core REEF layer, so no failed task events are received.
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+            // No failed evaluators or tasks.
+            Assert.Equal(0, failedEvaluatorCount);
+            Assert.Equal(0, failedTaskCount);
+            Assert.Equal(numTasks, completedTaskCount);
+
+            // eventually job succeeds
+            Assert.Equal(1, jobSuccess);
+            CleanUp(testFolder);
+        }
+
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskDispose.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
new file mode 100644
index 0000000..3e7f430
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
@@ -0,0 +1,95 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public sealed class TestFailMapperTasksOnInit : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test throws exception in two tasks during task initialization stage. 
+        /// Current exception handling code can't distinguish this from communication failure, so job is retried.
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+            // In each retry, there are 2 failed tasks.
+            // Rest of the tasks should be canceled and send completed task event to the driver. 
+            Assert.Equal(0, failedEvaluatorCount);
+            Assert.Equal(NumberOfRetry * 2, failedTaskCount);
+            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+
+            // eventually job succeeds
+            Assert.Equal(1, jobSuccess);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskInitialization.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
index cf16e25..3bd3ee0 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
@@ -36,6 +36,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
     public class TestFailUpdateEvaluator : IMRUBrodcastReduceTestBase
     {
         private const int NumberOfRetry = 3;
+        protected const string FailActionMessage = "The system cannot be recovered after";
 
         /// <summary>
         /// This test is to fail update evaluator and then try to resubmit. We don't recover from update evaluator failure. 
@@ -64,6 +65,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
             var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+            var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.FailActionPrefix);
 
             // there should be one try with each task either completing or disappearing with failed evaluator
             // no task failures
@@ -71,6 +73,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
             Assert.Equal(numTasks, runningTaskCount);
+            
+            // job fails
+            Assert.True(jobFailure > 0);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 4774eb0..50a75e8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -125,7 +125,10 @@ under the License.
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperTasksOnDispose.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnDispose.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperTasksOnInit.cs" />
     <Compile Include="Functional\IMRU\TestFailUpdateEvaluator.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperTasks.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperEvaluators.cs" />