You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/09/02 00:27:12 UTC

[3/3] reef git commit: [REEF-1251] IMRU Driver handlers for fault tolerant

[REEF-1251] IMRU Driver handlers for fault tolerant

* IMRU Driver handler re-write to support fault tolerant
* ServiceAndContextConfigurationProvider refactor and clean up for the updated IMRU driver
* IMRUClient update to use the updated IMRUDriver
* Allow client to set MaxRetryNumberInRecovery
* Fixes for bugs found during testing
* Add tests for evaluator and task failures at mapper and update evaluators

JIRA:
  [REEF-1251](https://issues.apache.org/jira/browse/REEF-1251)
  [REEF-1551](https://issues.apache.org/jira/browse/REEF-1551)
  [REEF-1552](https://issues.apache.org/jira/browse/REEF-1552)
  [REEF-1553](https://issues.apache.org/jira/browse/REEF-1553)

This closes #1087


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b14c8cd8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b14c8cd8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b14c8cd8

Branch: refs/heads/master
Commit: b14c8cd8191b70249a939c1cca25a61e7231a9b0
Parents: d116d94
Author: Julia Wang <ju...@apache.org>
Authored: Mon Jun 6 20:42:29 2016 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Thu Sep 1 17:18:03 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/EvaluatorRuntime.cs       |  11 +-
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |   3 +
 .../Org.Apache.REEF.Driver.csproj               |   4 +-
 .../OnREEFIMRURunTimeConfiguration.cs           |  19 +-
 .../Org.Apache.REEF.IMRU.Examples.csproj        |   1 +
 .../FaultTolerantPipelinedBroadcastAndReduce.cs | 170 ++++
 .../PipelinedBroadcastAndReduce.cs              |  27 +-
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    |  41 +-
 .../TestActiveContextManager.cs                 |  15 +-
 .../TestEvaluatorManager.cs                     |  76 +-
 .../TestTaskManager.cs                          |  54 +-
 .../API/IMRUJobDefinition.cs                    |  12 +
 .../API/IMRUJobDefinitionBuilder.cs             |  14 +
 .../OnREEF/Client/REEFIMRUClient.cs             |   4 +
 .../OnREEF/Driver/ActiveContextManager.cs       |  10 +-
 .../OnREEF/Driver/EvaluatorManager.cs           |  20 +-
 .../OnREEF/Driver/IMRUDriver.cs                 | 821 +++++++++++++++----
 .../ServiceAndContextConfigurationProvider.cs   | 191 +----
 .../OnREEF/Driver/TaskManager.cs                | 144 +++-
 .../OnREEF/IMRUTasks/MapTaskHost.cs             | 103 ++-
 .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs    |   4 +-
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  88 +-
 .../AllowedFailedEvaluatorsFraction.cs          |   2 +-
 .../Parameters/MaxRetryNumberInRecovery.cs      |  29 +
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 .../Group/Driver/IGroupCommDriver.cs            |   2 +-
 .../Group/Driver/Impl/GroupCommDriver.cs        |   2 +-
 .../Exceptions/IllegalStateException.cs         |   7 +
 .../Exceptions/InjectionException.cs            |   9 +-
 .../Functional/IMRU/IMRUBroadcastReduceTest.cs  |  27 +-
 ...oadcastReduceWithFilePartitionDataSetTest.cs |   6 +-
 .../IMRU/IMRUBroadcastReduceWithLocalFile.cs    |   1 +
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |  65 +-
 .../IMRUBrodcastReduceWithoutIMRUClientTest.cs  |   8 +-
 .../Functional/IMRU/IMRUCloseTaskTest.cs        |  26 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs | 173 ++++
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      |  86 ++
 .../Functional/IMRU/TestFailMapperTasks.cs      |  90 ++
 .../Functional/IMRU/TestFailUpdateEvaluator.cs  | 236 ++++++
 .../Org.Apache.REEF.Tests.csproj                |   4 +
 .../Remote/Impl/StreamingLink.cs                |   6 +-
 .../Remote/Impl/StreamingTransportClient.cs     |  33 +-
 .../Remote/Impl/StreamingTransportServer.cs     |  41 +-
 .../Remote/Parameters/ConnectionRetryCount.cs   |   2 +-
 44 files changed, 2176 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index c1448d1..077ba31 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -91,11 +91,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             }
         }
 
+        private string MessageFieldAsText(object field)
+        {
+            return field == null ? "null" : "not null";
+        }
+
         public void Handle(EvaluatorControlProto message)
         {
             lock (_heartBeatManager)
             {
-                Logger.Log(Level.Info, "Handle Evaluator control message");
+                var msg = " done_evaluator = " + MessageFieldAsText(message.done_evaluator)
+                          + " kill_evaluator = " + MessageFieldAsText(message.kill_evaluator)
+                          + " stop_evaluator = " + MessageFieldAsText(message.stop_evaluator)
+                          + " context_control = " + MessageFieldAsText(message.context_control);
+                Logger.Log(Level.Info, "Handle Evaluator control message: " + msg);
                 if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
                 {
                     OnException(new InvalidOperationException(

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 330c7b4..053ef23 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -113,14 +113,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 }
                 catch (TaskStartHandlerException e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException");
                     _currentStatus.SetException(e.InnerException);
                 }
                 catch (TaskStopHandlerException e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException");
                     _currentStatus.SetException(e.InnerException);
                 }
                 catch (Exception e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType());
                     _currentStatus.SetException(e);
                 }
                 finally

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index c7e55b2..9bebad6 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -30,7 +30,9 @@ under the License.
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
-    <Reference Include="Microsoft.Hadoop.Avro, Version=1.5.6.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL" />
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
     <Reference Include="protobuf-net">
       <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
     </Reference>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
index f47b473..31585da 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
@@ -19,7 +19,7 @@ using System.Globalization;
 using Org.Apache.REEF.Client.Local;
 using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.IMRU.OnREEF.Client;
-using Org.Apache.REEF.IO.FileSystem.Hadoop;
+using Org.Apache.REEF.Network;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Interface;
 
@@ -59,7 +59,7 @@ namespace Org.Apache.REEF.IMRU.Examples
                    .Build();
             }
 
-            return Configurations.Merge(runtimeConfig, imruClientConfig);
+            return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration());
         }
 
         /// <summary>
@@ -71,9 +71,18 @@ namespace Org.Apache.REEF.IMRU.Examples
             IConfiguration imruClientConfig =
                 REEFIMRUClientConfiguration.ConfigurationModule.Build();
 
-            IConfiguration runtimeConfig =
-                YARNClientConfiguration.ConfigurationModule.Build();
-            return Configurations.Merge(runtimeConfig, imruClientConfig);
+            var runtimeConfig = YARNClientConfiguration.ConfigurationModule
+                .Build();
+
+            return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration());
+        }
+
+        private static IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 1a17903..3cb5fd3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,6 +46,7 @@ under the License.
     <Compile Include="MapperCount\IdentityMapFunction.cs" />
     <Compile Include="IntSumReduceFunction.cs" />
     <Compile Include="MapperCount\MapperCount.cs" />
+    <Compile Include="PipelinedBroadcastReduce\FaultTolerantPipelinedBroadcastAndReduce.cs" />
     <Compile Include="SingleIterUpdateFunction.cs" />
     <Compile Include="NaturalSum\NaturalSum.cs" />
     <Compile Include="NaturalSum\NaturalSumMapFunction.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
new file mode 100644
index 0000000..672389c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
@@ -0,0 +1,170 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// IMRU program that performs broadcast and reduce with fault tolerance.
+    /// </summary>
+    public class FaultTolerantPipelinedBroadcastAndReduce : PipelinedBroadcastAndReduce
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce));
+
+        [Inject]
+        protected FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient imruClient) : base(imruClient)
+        {
+        }
+        
+        /// <summary>
+        /// Build a test mapper function configuration
+        /// </summary>
+        /// <param name="maxRetryInRecovery">Number of retries done if first run failed.</param>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery)
+        {
+            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)
+                .Build();
+
+            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), maxRetryInRecovery.ToString())
+                .Build();
+
+            return Configurations.Merge(c1, c2);
+        }
+
+        [NamedParameter(Documentation = "Set of task ids which will produce task/evaluator failure")]
+        public class TaskIdsToFail : Name<ISet<string>>
+        {
+        }
+
+        [NamedParameter(Documentation = "Type of failure to simulate")]
+        public class FailureType : Name<int>
+        {
+            internal static readonly int EvaluatorFailureDuringTaskExecution = 0;
+            internal static readonly int TaskFailureDuringTaskExecution = 1;
+            internal static readonly int EvaluatorFailureDuringTaskInitialization = 2;
+            internal static readonly int TaskFailureDuringTaskInitialization = 3;
+
+            internal static bool IsEvaluatorFailure(int failureType)
+            {
+                return failureType == EvaluatorFailureDuringTaskExecution ||
+                       failureType == EvaluatorFailureDuringTaskInitialization;
+            }
+        }
+
+        /// <summary>
+        /// The function is to simulate Evaluator/Task failure for mapper evaluator
+        /// </summary>
+        public sealed class TestSenderMapFunction : IMapFunction<int[], int[]>
+        {
+            private int _iterations;
+            private readonly string _taskId;
+            private readonly ISet<string> _taskIdsToFail;
+            private int _failureType;
+            private readonly int _maxRetryInRecovery;
+
+            [Inject]
+            private TestSenderMapFunction(
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+                [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
+                [Parameter(typeof(FailureType))] int failureType,
+                [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery)
+            {
+                _taskId = taskId;
+                _taskIdsToFail = taskIdsToFail;
+                _failureType = failureType;
+                _maxRetryInRecovery = maxRetryNumberInRecovery;
+                Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery {1},  Failure type: {2}.", _taskId, _maxRetryInRecovery, _failureType);
+                foreach (var n in _taskIdsToFail)
+                {
+                    Logger.Log(Level.Info, "TestSenderMapFunction: taskIdsToFail: {0}", n);
+                }
+
+                if (_failureType == FailureType.EvaluatorFailureDuringTaskInitialization || 
+                    _failureType == FailureType.TaskFailureDuringTaskInitialization)
+                {
+                    SimulateFailure(0);
+                }
+            }
+
+            /// <summary>
+            /// Map function
+            /// </summary>
+            /// <param name="mapInput">integer array</param>
+            /// <returns>The same integer array</returns>
+            int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+            {
+                _iterations++;
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}.", mapInput[0], _iterations);
+
+                if (_failureType == FailureType.EvaluatorFailureDuringTaskExecution ||
+                    _failureType == FailureType.TaskFailureDuringTaskExecution)
+                {
+                    SimulateFailure(10);
+                }
+
+                if (mapInput[0] != _iterations)
+                {
+                    Exceptions.Throw(new Exception("Expected value in mappers (" + _iterations + ") different from actual value (" + mapInput[0] + ")"), Logger);
+                }
+
+                return mapInput;
+            }
+
+            private void SimulateFailure(int onIteration)
+            {
+                if (_iterations == onIteration &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) != null &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + _maxRetryInRecovery)) == null)
+                {
+                    Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}",
+                        FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task",
+                        _taskId);
+                    if (FailureType.IsEvaluatorFailure(_failureType))
+                    {
+                        // simulate evaluator failure
+                        Environment.Exit(1);
+                    }
+                    else
+                    {
+                        // simulate task failure
+                        throw new ArgumentNullException("Simulating task failure");
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index de1598c..01c6daa 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Globalization;
 using System.IO;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
@@ -29,12 +31,12 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
     /// <summary>
     /// IMRU program that performs broadcast and reduce
     /// </summary>
-    public sealed class PipelinedBroadcastAndReduce
+    public class PipelinedBroadcastAndReduce
     {
         private readonly IIMRUClient _imruClient;
 
         [Inject]
-        private PipelinedBroadcastAndReduce(IIMRUClient imruClient)
+        protected PipelinedBroadcastAndReduce(IIMRUClient imruClient)
         {
             _imruClient = imruClient;
         }
@@ -42,7 +44,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Runs the actual broadcast and reduce job
         /// </summary>
-        public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory)
+        public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery)
         {
             var updateFunctionConfig =
                 TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule
@@ -76,10 +78,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
 
             var results = _imruClient.Submit<int[], int[], int[], Stream>(
                 new IMRUJobDefinitionBuilder()
-                    .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], int[]>.ConfigurationModule
-                        .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
-                            GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
-                        .Build())
+                    .SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery))
                     .SetUpdateFunctionConfiguration(updateFunctionConfig)
                     .SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
                         .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class)
@@ -99,8 +98,22 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                     .SetJobName("BroadcastReduce")
                     .SetNumberOfMappers(numberofMappers)
                     .SetMapperMemory(mapperMemory)
+                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
                     .SetUpdateTaskMemory(updateTaskMemory)
                     .Build());
         }
+
+        protected virtual IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery)
+        {
+            return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
+                .Build();
+        }
+
+        internal void Run(int v, int chunkSize, int iterations, int dims, int mapperMemory, int updateTaskMemory)
+        {
+            throw new NotImplementedException();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index 18876ab..d002f2d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -19,7 +19,6 @@ using System;
 using System.Globalization;
 using System.Linq;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IO.FileSystem.Hadoop;
 using Org.Apache.REEF.IO.FileSystem.Local;
@@ -30,7 +29,7 @@ using Org.Apache.REEF.Utilities.Logging;
 namespace Org.Apache.REEF.IMRU.Examples
 {
     /// <summary>
-    /// Runs IMRU for mapper count either in localruntime or on cluster.
+    /// Runs IMRU for mapper count either in local runtime or on cluster.
     /// </summary>
     public class Run
     {
@@ -59,13 +58,14 @@ namespace Org.Apache.REEF.IMRU.Examples
             mapperCountExample.Run(numNodes - 1, filename, fileSystemConfig);
         }
 
-        public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string[] args, params string[] runtimeDir)
+        public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, bool faultTolerant, string[] args, params string[] runtimeDir)
         {
             int chunkSize = 2;
             int dims = 10;
-            int iterations = 10;
+            int iterations = 100;
             int mapperMemory = 512;
             int updateTaskMemory = 512;
+            int maxRetryNumberInRecovery = 2;         
 
             if (args.Length > 0)
             {
@@ -92,6 +92,11 @@ namespace Org.Apache.REEF.IMRU.Examples
                 iterations = Convert.ToInt32(args[4]);
             }
 
+            if (args.Length > 5)
+            {
+                maxRetryNumberInRecovery = Convert.ToInt32(args[5]);
+            }
+
             IInjector injector;
 
             if (!runOnYarn)
@@ -105,10 +110,26 @@ namespace Org.Apache.REEF.IMRU.Examples
                 injector = TangFactory.GetTang()
                     .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], int[]>.GetYarnIMRUConfiguration(), tcpPortConfig);
             }
-            var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>();
-            broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory);
+
+            if (faultTolerant)
+            {
+                var broadcastReduceFtExample = injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>();
+                broadcastReduceFtExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+            }
+            else
+            {
+                var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>();
+                broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+            }
         }
 
+        /// <summary>
+        /// Run IMRU examples from command line
+        /// </summary>
+        /// Sample command line:  
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduce 20000000 1000000 1024 1024 10 2
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduceft 20000000 1000000 1024 1024 100 2
+        /// <param name="args"></param>
         private static void Main(string[] args)
         {
             Logger.Log(Level.Info, "start running client: " + DateTime.Now);
@@ -168,10 +189,16 @@ namespace Org.Apache.REEF.IMRU.Examples
 
                 case "broadcastandreduce":
                     Logger.Log(Level.Info, "Running Broadcast and Reduce");
-                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, args.Skip(5).ToArray());
+                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, false, args.Skip(5).ToArray());
                     Logger.Log(Level.Info, "Done Running Broadcast and Reduce");
                     return;
 
+                case "broadcastandreduceft":
+                    Logger.Log(Level.Info, "Running Broadcast and Reduce FT");
+                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, true, args.Skip(5).ToArray());
+                    Logger.Log(Level.Info, "Done Running Broadcast and Reduce FT");
+                    return;
+
                 default:
                     Logger.Log(Level.Info, "wrong test name");
                     return;

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
index d3d09d6..f2a0dbf 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using NSubstitute;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
@@ -181,10 +182,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         /// <summary>
         /// A Context Manager observer for test
         /// </summary>
-        private sealed class TestContextObserver : IObserver<IDictionary<string, IActiveContext>>
+        private sealed class TestContextObserver : IObserver<IEnumerable<IActiveContext>>
         {
             private readonly int _totalExpected;
-            private IDictionary<string, IActiveContext> _contexts = null;
+            private int _contextCount = 0;
 
             internal TestContextObserver(int totalExpected)
             {
@@ -203,16 +204,12 @@ namespace Org.Apache.REEF.IMRU.Tests
 
             public int NumberOfActiveContextsReceived()
             {
-                if (_contexts != null)
-                {
-                    return _contexts.Count;                    
-                }
-                return 0;
+                return _contextCount;
             }
 
-            public void OnNext(IDictionary<string, IActiveContext> value)
+            public void OnNext(IEnumerable<IActiveContext> value)
             {
-                _contexts = value;
+                _contextCount = value.Count();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
index 489c725..a5c4caa 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
@@ -38,22 +38,22 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestValidAddRemoveAllocatedEvaluator()
         {
-            var evalutorManager = CreateTestEvaluators(3, 1);
-            Assert.Equal(3, evalutorManager.NumberOfAllocatedEvaluators);
-            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
-            Assert.True(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
-            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2));
-            Assert.True(evalutorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2));
-            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            Assert.Equal(3, evaluatorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evaluatorManager.AreAllEvaluatorsAllocated());
+            Assert.True(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
+            Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2));
+            Assert.True(evaluatorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2));
+            Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
 
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            Assert.Equal(2, evalutorManager.NumberOfAllocatedEvaluators);
-            Assert.True(evalutorManager.IsMasterEvaluatorFailed());
-            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            Assert.Equal(2, evaluatorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evaluatorManager.IsMasterEvaluatorFailed());
+            Assert.Equal(0, evaluatorManager.NumberofFailedMappers());
 
-            evalutorManager.ResetFailedEvaluators();
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
+            evaluatorManager.ResetFailedEvaluators();
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            Assert.True(evaluatorManager.AreAllEvaluatorsAllocated());
         }
 
         /// <summary>
@@ -62,10 +62,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestNoMasterEvaluator()
         {
-            var evalutorManager = CreateEvaluatorManager(3, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(3, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -75,10 +75,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestTwoMasterEvaluator()
         {
-            var evalutorManager = CreateEvaluatorManager(3, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(3, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -88,10 +88,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestTooManyEvaluators()
         {
-            var evalutorManager = CreateEvaluatorManager(2, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(2, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -113,14 +113,14 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestResetFailedEvaluators()
         {
-            var evalutorManager = CreateTestEvaluators(3, 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
-            Assert.Equal(2, evalutorManager.NumberOfMissingEvaluators());
-            evalutorManager.ResetFailedEvaluators();
-            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
-            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
-            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Assert.Equal(2, evaluatorManager.NumberOfMissingEvaluators());
+            evaluatorManager.ResetFailedEvaluators();
+            Assert.Equal(0, evaluatorManager.NumberofFailedMappers());
+            Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
+            Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
         }
 
         /// <summary>
@@ -129,10 +129,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestReachedMaximumNumberOfEvaluatorFailures()
         {
-            var evalutorManager = CreateTestEvaluators(3, 2);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
-            Assert.True(evalutorManager.ReachedMaximumNumberOfEvaluatorFailures());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Assert.True(evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures());
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index bdcebc2..d35f7c8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -205,7 +205,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError));
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError));
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -254,7 +254,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver));
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -281,7 +281,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -312,7 +312,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -341,7 +341,19 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
+        }
+
+        /// <summary>
+        /// Test the scenario where there is no task associated with the Failed Evaluator. 
+        /// This can happen when submitting a task on a failed evaluator. 
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorWithUnsuccessfullySubmittedTask()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluatorWithoutTaskId(EvaluatorIdPrefix + ContextIdPrefix + 1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
         }
 
         /// <summary>
@@ -378,7 +390,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -411,7 +423,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -439,7 +451,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -476,7 +488,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -515,7 +527,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -548,7 +560,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -633,15 +645,19 @@ namespace Org.Apache.REEF.IMRU.Tests
                 case TaskManager.TaskGroupCommunicationError:
                     taskException = new IMRUTaskGroupCommunicationException(errorMsg);
                     break;
-                default:
+                case TaskManager.TaskSystemError:
                     taskException = new IMRUTaskSystemException(errorMsg);
                     break;
+                default:
+                    taskException = new IMRUTaskAppException(errorMsg);
+                    break;
             }
 
             IFailedTask failedtask = Substitute.For<IFailedTask>();
             failedtask.Id.Returns(taskId);
             failedtask.Message.Returns(errorMsg);
             failedtask.AsError().Returns(taskException);
+            failedtask.GetActiveContext().Returns(Optional<IActiveContext>.Empty());
             return failedtask;
         }
 
@@ -684,6 +700,20 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Creates a mock IFailedEvaluator with no task id associated
+        /// This is to simulate the case where task is submitted on a failed evaluator. 
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        private static IFailedEvaluator CreateMockFailedEvaluatorWithoutTaskId(string evaluatorId)
+        {
+            var failedEvalutor = Substitute.For<IFailedEvaluator>();
+            failedEvalutor.Id.Returns(evaluatorId);
+            failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Empty());
+            return failedEvalutor;
+        }
+
+        /// <summary>
         /// Creates a mock IConfiguration
         /// </summary>
         /// <returns></returns>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 5ea8d23..d42bf9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -41,6 +41,7 @@ namespace Org.Apache.REEF.IMRU.API
         private readonly int _updateTaskMemory;
         private readonly int _mapTaskCores;
         private readonly int _updateTaskCores;
+        private readonly int _maxRetryNumberInRecovery;
         private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig;
         private readonly bool _invokeGC;
 
@@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API
         /// <param name="updateTaskMemory">Update task memory</param>
         /// <param name="mapTaskCores">Number of map task cores</param>
         /// <param name="updateTaskCores">Number of update task cores</param>
+        /// <param name="maxRetryNumberInRecovery">Max number of retries done if first run of IMRU job failed</param>
         /// <param name="jobName">Job name</param>
         /// <param name="invokeGC">Whether to call garbage collector after each iteration</param>
         internal IMRUJobDefinition(
@@ -84,6 +86,7 @@ namespace Org.Apache.REEF.IMRU.API
             int updateTaskMemory,
             int mapTaskCores,
             int updateTaskCores,
+            int maxRetryNumberInRecovery,
             string jobName,
             bool invokeGC)
         {
@@ -101,6 +104,7 @@ namespace Org.Apache.REEF.IMRU.API
             _updateTaskMemory = updateTaskMemory;
             _mapTaskCores = mapTaskCores;
             _updateTaskCores = updateTaskCores;
+            _maxRetryNumberInRecovery = maxRetryNumberInRecovery;
             _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig;
             _invokeGC = invokeGC;
             _resultHandlerConfiguration = resultHandlerConfiguration;
@@ -223,6 +227,14 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Max number of retries done if first run of IMRU job failed.
+        /// </summary>
+        internal int MaxRetryNumberInRecovery
+        {
+            get { return _maxRetryNumberInRecovery; }
+        }
+
+        /// <summary>
         /// Per mapper configuration
         /// </summary>
         internal ISet<IConfiguration> PerMapConfigGeneratorConfig

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index f078c4a..5d56fde 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -38,6 +38,7 @@ namespace Org.Apache.REEF.IMRU.API
         private int _updateTaskMemory;
         private int _coresPerMapper;
         private int _updateTaskCores;
+        private int _maxRetryNumberInRecovery;
         private IConfiguration _mapFunctionConfiguration;
         private IConfiguration _mapInputCodecConfiguration;
         private IConfiguration _updateFunctionCodecsConfiguration;
@@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API
             _updateTaskMemory = 512;
             _coresPerMapper = 1;
             _updateTaskCores = 1;
+            _maxRetryNumberInRecovery = 0;
             _invokeGC = true;
             _perMapConfigGeneratorConfig = new HashSet<IConfiguration>();
         }
@@ -233,6 +235,17 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Set max number of retries done if first run of IMRU job failed.
+        /// </summary>
+        /// <param name="maxRetryNumberInRecovery">Max number of retries</param>
+        /// <returns></returns>
+        public IMRUJobDefinitionBuilder SetMaxRetryNumberInRecovery(int maxRetryNumberInRecovery)
+        {
+            _maxRetryNumberInRecovery = maxRetryNumberInRecovery;
+            return this;
+        }
+
+        /// <summary>
         /// Sets Per Map Configuration
         /// </summary>
         /// <param name="perMapperConfig">Mapper configs</param>
@@ -320,6 +333,7 @@ namespace Org.Apache.REEF.IMRU.API
                 _updateTaskMemory,
                 _coresPerMapper,
                 _updateTaskCores,
+                _maxRetryNumberInRecovery,
                 _jobName,
                 _invokeGC);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 969a874..9a256e0 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -104,6 +104,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                         GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnTaskFailed,
                         GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                    .Set(DriverConfiguration.OnTaskRunning,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                     .Set(DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
                     .Build(),
                 TangFactory.GetTang().NewConfigurationBuilder()
@@ -144,6 +146,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                     jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(CoresForUpdateTask),
                     jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
+                    jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(InvokeGC),
                     jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
                 .Build();

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
index 219a9f6..437b76f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
@@ -32,12 +32,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// Manages active contexts for the driver
     /// </summary>
     [NotThreadSafe]
-    internal sealed class ActiveContextManager : IDisposable
+    internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager));
         private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>();
         private readonly int _totalExpectedContexts;
-        private IObserver<IDictionary<string, IActiveContext>> _activeContextObserver;
+        private IObserver<IEnumerable<IActiveContext>> _activeContextObserver;
 
         /// <summary>
         /// Constructor of ActiveContextManager
@@ -71,7 +71,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         /// <param name="activeContextObserver"></param>
         /// <returns></returns>
-        public IDisposable Subscribe(IObserver<IDictionary<string, IActiveContext>> activeContextObserver)
+        public IDisposable Subscribe(IObserver<IEnumerable<IActiveContext>> activeContextObserver)
         {
             if (_activeContextObserver != null)
             {
@@ -84,7 +84,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Checks if all the requested contexts are received. 
         /// </summary>
-        private bool AreAllContextsReceived
+        internal bool AreAllContextsReceived
         {
             get { return _totalExpectedContexts == NumberOfActiveContexts; }
         }
@@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             if (AreAllContextsReceived && _activeContextObserver != null)
             {
-                _activeContextObserver.OnNext(_activeContexts);
+                _activeContextObserver.OnNext(_activeContexts.Values);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
index 8fa9876..5f6856c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
@@ -201,7 +201,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Records failed Evaluator
         /// Removes it from allocated Evaluator and adds it to the failed Evaluators collection
-        /// If the evaluatorId is not in _failedEvaluators, throw IMRUSystemException
+        /// If the evaluatorId is already in _failedEvaluators, throw IMRUSystemException
         /// </summary>
         /// <param name="evaluatorId"></param>
         internal void RecordFailedEvaluator(string evaluatorId)
@@ -217,11 +217,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Remove failed evaluator from the colletion
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        internal void RemoveFailedEvaluator(string evaluatorId)
+        {
+            if (!_failedEvaluatorIds.Contains(evaluatorId))
+            {
+                string msg = string.Format("The failed evaluator {0} is not recorded in list of failed evaluators.", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _failedEvaluatorIds.Remove(evaluatorId);
+        }
+
+        /// <summary>
         /// Checks if the number of failed Evaluators has reached allowed maximum number of evaluator failures 
         /// </summary>
-        internal bool ReachedMaximumNumberOfEvaluatorFailures()
+        internal bool ExceededMaximumNumberOfEvaluatorFailures()
         {
-            return _failedEvaluatorIds.Count >= AllowedNumberOfEvaluatorFailures;
+            return _failedEvaluatorIds.Count > AllowedNumberOfEvaluatorFailures;
         }
 
         /// <summary>