You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/09/02 00:27:10 UTC

[1/3] reef git commit: [REEF-1251] IMRU Driver handlers for fault tolerant

Repository: reef
Updated Branches:
  refs/heads/master d116d94e6 -> b14c8cd81


http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index af4fbf1..01a2bdb 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -17,6 +17,8 @@
 
 using System;
 using System.IO;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
@@ -77,14 +79,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="resultHandler">Result handler</param>
         /// <param name="taskCloseCoordinator">Task close Coordinator</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param>
+        /// <param name="taskId">task id</param>
         [Inject]
         private UpdateTaskHost(
             IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
             IGroupCommClient groupCommunicationsClient,
             IIMRUResultHandler<TResult> resultHandler,
             TaskCloseCoordinator taskCloseCoordinator,
-            [Parameter(typeof(InvokeGC))] bool invokeGC)
+            [Parameter(typeof(InvokeGC))] bool invokeGC,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId)
         {
+            Logger.Log(Level.Info, "Entering constructor of UpdateTaskHost for task id {0}", taskId);
             _updateTask = updateTask;
             _groupCommunicationsClient = groupCommunicationsClient;
             var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
@@ -95,6 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _resultHandler = resultHandler;
             _taskCloseCoordinator = taskCloseCoordinator;
             _cancellationSource = new CancellationTokenSource();
+            Logger.Log(Level.Info, "UpdateTaskHost initialized.");
         }
 
         /// <summary>
@@ -104,6 +110,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <returns></returns>
         public byte[] Call(byte[] memento)
         {
+            Logger.Log(Level.Info, "Entering UpdateTaskHost Call().");
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
             try
@@ -148,27 +155,80 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     "Received OperationCanceledException in UpdateTaskHost with message: {0}.",
                     e.Message);
             }
-            catch (IOException e)
+            catch (Exception e)
             {
-                Logger.Log(Level.Error, "Received IOException in UpdateTaskHost with message: {0}.", e.Message);
-                if (!_cancellationSource.IsCancellationRequested)
+                if (e is IOException || e is TcpClientConnectionException || e is RemotingException ||
+                    e is SocketException)
+                {
+                    Logger.Log(Level.Error,
+                        "Received Exception {0} in UpdateTaskHost with message: {1}. The cancellation token is: {2}.",
+                        e.GetType(),
+                        e.Message,
+                        _cancellationSource.IsCancellationRequested);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        Logger.Log(Level.Error,
+                            "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                            _cancellationSource.IsCancellationRequested);
+                        throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    }
+                }
+                else if (e is AggregateException)
+                {
+                    Logger.Log(Level.Error,
+                        "Received AggregateException. The cancellation token is: {0}.",
+                        _cancellationSource.IsCancellationRequested);
+                    if (e.InnerException != null)
+                    {
+                        Logger.Log(Level.Error,
+                            "InnerException {0}, with message {1}.",
+                            e.InnerException.GetType(),
+                            e.InnerException.Message);
+                    }
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        if (e.InnerException != null && e.InnerException is IOException)
+                        {
+                            Logger.Log(Level.Error,
+                                "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                                _cancellationSource.IsCancellationRequested);
+                            throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                        }
+                        else
+                        {
+                            throw e;
+                        }
+                    }
+                }
+                else
                 {
-                    throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    Logger.Log(Level.Error,
+                       "UpdateTaskHost is throwing Excetion {0}, messge {1} with cancellation token: {2} and StackTrace {3}.",
+                       e.GetType(),
+                       e.Message,
+                       _cancellationSource.IsCancellationRequested,
+                       e.StackTrace);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        throw e;
+                    }
                 }
             }
-            catch (TcpClientConnectionException e)
+            finally
             {
-                Logger.Log(Level.Error,
-                    "Received TcpClientConnectionException in UpdateTaskHost with message: {0}.",
-                    e.Message);
-                if (!_cancellationSource.IsCancellationRequested)
+                try
+                {
+                    _resultHandler.Dispose();
+                }
+                catch (Exception e)
                 {
-                    throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    Logger.Log(Level.Error, "Exception in dispose result handler.", e);
+                    //// TODO throw proper exceptions JIRA REEF-1492
                 }
+                _taskCloseCoordinator.SignalTaskStopped();
+                Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested);
             }
-            _resultHandler.Dispose();
-            _taskCloseCoordinator.SignalTaskStopped();
-            Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested);
+
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
index 5a5b9c3..40816c1 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
 {
-    [NamedParameter("Determines number of failed evaluators (AllowedFailedEvaluators * Number of mappers) tolerated before throwing exception", "failedevaluators", "2.0")]
+    [NamedParameter("Determines number of failed evaluators (AllowedFailedEvaluators * Number of mappers) tolerated before throwing exception", "failedevaluators", "0.5")]
     internal sealed class AllowedFailedEvaluatorsFraction : Name<double>
     {
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
new file mode 100644
index 0000000..cc7a5b7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
@@ -0,0 +1,29 @@
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    /// <summary>
+    /// Max retry number for the system recovery
+    /// </summary>
+    [NamedParameter("Maximum retry number in fault tolerant recovery.", "maxRetryInRecovery", "3")]
+    public sealed class MaxRetryNumberInRecovery : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 30d110a..cd3603a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -104,6 +104,7 @@ under the License.
     <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
+    <Compile Include="OnREEF\Parameters\MaxRetryNumberInRecovery.cs" />
     <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
     <Compile Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
index acf640b..16dba10 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
@@ -29,7 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Returns the identifier for the master task
         /// </summary>
-        string MasterTaskId { get; }
+        string MasterTaskId { get; set; }
 
         ICommunicationGroupDriver DefaultGroup { get; }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index e636b04..7fe797a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -100,7 +100,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <summary>
         /// Returns the identifier for the master task
         /// </summary>
-        public string MasterTaskId { get; private set; }
+        public string MasterTaskId { get; set; }
 
         public ICommunicationGroupDriver DefaultGroup
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
index 6736e10..fc4a12b 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
@@ -16,9 +16,11 @@
 // under the License.
 
 using System;
+using System.Runtime.Serialization;
 
 namespace Org.Apache.REEF.Tang.Exceptions
 {
+    [Serializable]
     public sealed class IllegalStateException : Exception
     {
         public IllegalStateException()
@@ -30,6 +32,11 @@ namespace Org.Apache.REEF.Tang.Exceptions
         {           
         }
 
+        public IllegalStateException(SerializationInfo info, StreamingContext context)
+            : base(info, context)
+        {
+        }
+
         public IllegalStateException(string message, Exception innerException)
             : base(message, innerException)
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
index 4fc991c..b9a8189 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
@@ -16,19 +16,26 @@
 // under the License.
 
 using System;
+using System.Runtime.Serialization;
 
 namespace Org.Apache.REEF.Tang.Exceptions
 {
+    [Serializable]
     public sealed class InjectionException : Exception
     {
         internal InjectionException(string msg)
             : base(msg)
-        {           
+        {
         }
 
         internal InjectionException(string message, Exception innerException)
             : base(message, innerException)
         {
         }
+
+        public InjectionException(SerializationInfo info, StreamingContext context)
+            : base(info, context)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
index 376ca7c..63126e8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
@@ -28,31 +28,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUMapperCountTest));
 
         private static readonly int NumNodes = 4;
+        private static readonly int NumOfRetry = 2;
 
         [Fact]
         [Trait("Description", "Run IMRU broadcast and reduce example as test.")]
         void TestIMRUBroadcastReduceOnLocalRuntime()
         {
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestIMRUBroadcastReduce(false, testFolder);
+            TestIMRUBroadcastReduce(false, false, testFolder);
             ValidateSuccessForLocalRuntime(NumNodes, testFolder: testFolder);
             CleanUp(testFolder);
         }
 
+        [Fact]
+        [Trait("Description", "Run IMRU broadcast and reduce example as test.")]
+        void TestIMRUBroadcastReduceWithFTOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestIMRUBroadcastReduce(false, true, testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, "Received IRunningTask");
+            var failedEvaluatorCount = GetMessageCount(lines, "Received IFailedEvaluator");
+            var failedTaskCount = GetMessageCount(lines, "Received IFailedTask");
+            Assert.Equal((NumOfRetry + 1) * NumNodes, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.Equal((NumOfRetry + 1) * NumNodes, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
         [Fact(Skip = "Requires Yarn")]
         [Trait("Description", "Run IMRU broadcast and reduce example as test on Yarn.")]
         void TestIMRUBroadcastReduceOnYarn()
         {
-            TestIMRUBroadcastReduce(true);
+            TestIMRUBroadcastReduce(true, false);
         }
 
-        private void TestIMRUBroadcastReduce(bool runOnYarn, params string[] testFolder)
+        private void TestIMRUBroadcastReduce(bool runOnYarn, bool faultTolerant, params string[] testFolder)
         {
             var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
                 .Set(TcpPortConfigurationModule.PortRangeStart, "8900")
                 .Set(TcpPortConfigurationModule.PortRangeCount, "1000")
                 .Build();
-            Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, new string[0], testFolder);
+
+            string[] args = { "10", "2", "512", "512", "100", NumOfRetry.ToString() };
+            Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, faultTolerant, args, testFolder);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
index e58e236..3bf712b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
@@ -75,6 +75,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 iterations,
                 mapperMemory,
                 updateTaskMemory,
+                0,
                 testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
             CleanUp(testFolder);
@@ -90,11 +91,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int iterations,
             int mapperMemory,
             int updateTaskMemory,
+            int numberOfRetryInRecovery = 0,
             string testFolder = DefaultRuntimeFolder)
         {
             string runPlatform = runOnYarn ? "yarn" : "local";
             TestRun(DriverConfiguration<int[], int[], int[], IEnumerable<Row>>(
-                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory),
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery),
                 DriverEventHandlerConfigurations<int[], int[], int[], IEnumerable<Row>>()),
                 typeof(BroadcastReduceDriver),
                 numTasks,
@@ -121,6 +123,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
index aa287b1..2bacd5a 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
@@ -48,6 +48,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 iterations,
                 mapperMemory,
                 updateTaskMemory,
+                0,
                 testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
             CleanUp(testFolder);

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index 2f4b109..f20ec31 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -16,10 +16,11 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Linq;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
@@ -29,6 +30,7 @@ using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverA
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -48,6 +50,11 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         protected static readonly Logger Logger = Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase));
         private const string IMRUJobName = "IMRUBroadcastReduce";
 
+        protected const string CompletedTaskMessage = "CompletedTaskMessage";
+        protected const string RunningTaskMessage = "RunningTaskMessage";
+        protected const string FailedTaskMessage = "FailedTaskMessage";
+        protected const string FailedEvaluatorMessage = "FailedEvaluatorMessage";
+
         /// <summary>
         /// Abstract method for subclass to override it to provide configurations for driver handlers 
         /// </summary>
@@ -70,6 +77,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="dims"></param>
         /// <param name="iterations"></param>
         /// <param name="mapperMemory"></param>
+        /// <param name="numberOfRetryInRecovery"></param>
         /// <param name="updateTaskMemory"></param>
         /// <param name="testFolder"></param>
         protected void TestBroadCastAndReduce(bool runOnYarn,
@@ -79,11 +87,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int iterations,
             int mapperMemory,
             int updateTaskMemory,
+            int numberOfRetryInRecovery = 0,
             string testFolder = DefaultRuntimeFolder)
         {
             string runPlatform = runOnYarn ? "yarn" : "local";
             TestRun(DriverConfiguration<int[], int[], int[], Stream>(
-                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory),
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery),
                 DriverEventHandlerConfigurations<int[], int[], int[], Stream>()),
                 typeof(BroadcastReduceDriver),
                 numTasks,
@@ -151,6 +160,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(CoresForUpdateTask),
                     jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
+                    jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(InvokeGC),
                     jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
                 .Build();
@@ -190,13 +201,15 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="dim"></param>
         /// <param name="mapperMemory"></param>
         /// <param name="updateTaskMemory"></param>
+        /// <param name="numberOfRetryInRecovery"></param>
         /// <returns></returns>
         protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers,
             int chunkSize,
             int numIterations,
             int dim,
             int mapperMemory,
-            int updateTaskMemory)
+            int updateTaskMemory,
+            int numberOfRetryInRecovery)
         {
             var updateFunctionConfig =
                 TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
@@ -221,6 +234,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .SetNumberOfMappers(numberofMappers)
                 .SetMapperMemory(mapperMemory)
                 .SetUpdateTaskMemory(updateTaskMemory)
+                .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery)
                 .Build();
         }
 
@@ -309,5 +323,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IntArraySumReduceFunction>.Class)
                 .Build();
         }
+
+        /// <summary>
+        /// This class contains handlers for log purpose only
+        /// </summary>
+        protected sealed class MessageLogger :
+            IObserver<ICompletedTask>,
+            IObserver<IFailedEvaluator>,
+            IObserver<IFailedTask>,
+            IObserver<IRunningTask>
+        {
+            [Inject]
+            private MessageLogger()
+            {
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, CompletedTaskMessage + " " + value.Id + " " + value.ActiveContext.EvaluatorId);
+            }
+
+            public void OnNext(IFailedTask value)
+            {
+                Logger.Log(Level.Info, FailedTaskMessage + " " + value.Id + " " + value.GetActiveContext().Value.EvaluatorId);
+            }
+
+            public void OnNext(IFailedEvaluator value)
+            {
+                Logger.Log(Level.Info, FailedEvaluatorMessage + " " + value.Id + " " + (value.FailedTask.IsPresent() ? value.FailedTask.Value.Id : "no task"));
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                Logger.Log(Level.Info, RunningTaskMessage + " " + value.Id + " " + value.ActiveContext.EvaluatorId);
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
index c00f44d..eaa405f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
@@ -38,8 +38,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int mapperMemory = 5120;
             int updateTaskMemory = 5120;
             int numTasks = 4;
+            int numberOfRetryInRecovery = 4;
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder);
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder);
             CleanUp(testFolder);
         }
@@ -56,7 +57,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int mapperMemory = 5120;
             int updateTaskMemory = 5120;
             int numTasks = 4;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
+            int numberOfRetryInRecovery = 4;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery);
         }
 
         /// <summary>
@@ -82,6 +84,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
                      GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index b462438..068aa50 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -34,23 +34,19 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     /// <summary>
     /// This is to test close event handler in IMRU tasks
-    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the 
+    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the
     /// failed tasks and completed tasks
     /// </summary>
     [Collection("FunctionalTests")]
     public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
     {
-        private const string CompletedTaskMessage = "CompletedTaskMessage";
-        private const string FailEvaluatorMessage = "FailEvaluatorMessage";
-        private const string FailTaskMessage = "FailTaskMessage";
-
         /// <summary>
         /// This test is for running in local runtime
         /// It sends close event for all the running tasks.
-        /// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call() 
+        /// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call()
         /// method and driver will receive ICompletedTask.
-        /// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask. 
-        /// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator. 
+        /// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask.
+        /// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator.
         /// </summary>
         [Fact]
         public void TestTaskCloseOnLocalRuntime()
@@ -61,11 +57,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int mapperMemory = 512;
             const int updateTaskMemory = 512;
             const int numTasks = 4;
+            const int numOfRetryInRecovery = 4;
             var testFolder = DefaultRuntimeFolder + TestId;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder);
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery, testFolder);
             string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120);
             var completedCount = GetMessageCount(lines, CompletedTaskMessage);
-            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            var failedCount = GetMessageCount(lines, FailedTaskMessage);
             Assert.Equal(numTasks, completedCount + failedCount);
             CleanUp(testFolder);
         }
@@ -83,12 +80,13 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int mapperMemory = 512;
             const int updateTaskMemory = 512;
             const int numTasks = 4;
-            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
+            const int numOfRetryInRecovery = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery);
         }
 
         /// <summary>
         /// This method overrides base class method and defines its own event handlers for driver. 
-        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios 
+        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios
         /// and verify the test result. 
         /// Rest of the event handlers use those from IMRUDriver. In IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
         /// </summary>
@@ -190,7 +188,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             {
                 lock (_lock)
                 {
-                    Logger.Log(Level.Info, FailTaskMessage + value.Id);
+                    Logger.Log(Level.Info, FailedTaskMessage + value.Id);
                     CloseRunningTasks();
                     value.GetActiveContext().Value.Dispose();
                 }
@@ -202,7 +200,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             /// <param name="value"></param>
             public void OnNext(IFailedEvaluator value)
             {
-                throw new Exception(FailEvaluatorMessage);
+                throw new Exception(FailedEvaluatorMessage);
             }
 
             /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
new file mode 100644
index 0000000..af02405
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -0,0 +1,173 @@
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Network;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperEvaluators : IMRUBrodcastReduceTestBase
+    {
+        protected const int NumberOfRetry = 3;
+
+        /// <summary>
+        /// This test is to fail one evaluator and then try to resubmit. In the last retry, 
+        /// there will be no failed evaluator and all tasks will be successfully completed. 
+        /// </summary>
+        [Fact]
+        public virtual void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var completedTaskCount = GetMessageCount(lines, CompletedTaskMessage);
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete or disappear with failed evaluator
+            // and on each try all tasks should start successfully
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public virtual void TestFailedMapperOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Verbose.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .Build();
+
+            return Configurations.Merge(c1, c2, GetTcpConfiguration());
+        }
+
+        /// <summary>
+        /// Update function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            var c = IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction,
+                    GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class)
+                .Build();
+
+            return Configurations.Merge(c, GetTcpConfiguration());
+        }
+
+        /// <summary>
+        /// Override default setting for retry policy
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
new file mode 100644
index 0000000..6fecb2c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -0,0 +1,86 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
+        /// and no retries will be done. 
+        /// </summary>
+        [Fact(Skip = "Times out at high timeout for RetryCountWaitingForRegistration; disabling until this parameter is configurable in test.")]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete or disappear with failed evaluator
+            // not all tasks will start successfully, so not checking this
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskInitialization.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
new file mode 100644
index 0000000..dc998fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -0,0 +1,90 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperTasks : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
+        /// and no retries will be done. 
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete
+            // there shoould be no failed evaluators
+            // and on each try all tasks should start successfully
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedTaskCount);
+            Assert.Equal(0, failedEvaluatorCount);
+            Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
new file mode 100644
index 0000000..cf16e25
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
@@ -0,0 +1,236 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailUpdateEvaluator : IMRUBrodcastReduceTestBase
+    {
+        private const int NumberOfRetry = 3;
+
+        /// <summary>
+        /// This test is to fail update evaluator and then try to resubmit. We don't recover from update evaluator failure. 
+        /// </summary>
+        [Fact]
+        public virtual void TestFailedUpdateOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+            var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // there should be one try with each task either completing or disappearing with failed evaluator
+            // no task failures
+            // and on this try all tasks should start successfully
+            Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount);
+            Assert.Equal(0, failedTaskCount);
+            Assert.Equal(numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public virtual void TestFailedUpdateOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Verbose.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            var c = IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction,
+                    GenericType<TestUpdateFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .Build();
+        }
+
+        internal sealed class TestUpdateFunction : IUpdateFunction<int[], int[], int[]>
+        {
+            private int _iterations;
+            private readonly int _maxIters;
+            private readonly int _dim;
+            private readonly int[] _intArr;
+            private readonly int _workers;
+            private readonly string _taskId;
+            private int _failureType;
+
+            [Inject]
+            private TestUpdateFunction(
+                [Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters,
+                [Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] int dim,
+                [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] int numWorkers,
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+                [Parameter(typeof(FailureType))] int failureType)
+            {
+                _maxIters = maxIters;
+                _iterations = 0;
+                _dim = dim;
+                _intArr = new int[_dim];
+                _workers = numWorkers;
+                _taskId = taskId;
+                _failureType = failureType;
+                Logger.Log(Level.Info, "TestUpdateFunction: TaskId: {0}", _taskId);
+                Logger.Log(Level.Info, "Failure type: {0} failure", FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task");
+            }
+
+            /// <summary>
+            /// Update function
+            /// </summary>
+            /// <param name="input">integer array</param>
+            /// <returns>The same integer array</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Update(int[] input)
+            {
+                if (input[0] != (_iterations + 1) * _workers)
+                {
+                    Exceptions.Throw(new Exception("Expected input to update functon not same as actual input"), Logger);
+                }
+
+                _iterations++;
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}", input[0], _iterations);
+                MakeException();
+
+                if (_iterations < _maxIters)
+                {
+                    for (int i = 0; i < _dim; i++)
+                    {
+                        _intArr[i] = _iterations + 1;
+                    }
+
+                    return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+                }
+
+                return UpdateResult<int[], int[]>.Done(input);
+            }
+
+            /// <summary>
+            /// Initialize function. Sends integer array with value 1 to all mappers
+            /// </summary>
+            /// <returns>Map input</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Initialize()
+            {
+                for (int i = 0; i < _dim; i++)
+                {
+                    _intArr[i] = _iterations + 1;
+                }
+
+                return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+            }
+
+            private void MakeException()
+            {
+                if (_iterations == 10 && !_taskId.EndsWith("-" + NumberOfRetry))
+                { 
+                    Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}",
+                        FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task",
+                        _taskId);
+                    if (FailureType.IsEvaluatorFailure(_failureType))
+                    {
+                        // simulate evaluator failure
+                        Environment.Exit(1);
+                    }
+                    else
+                    {
+                        // simulate task failure
+                        throw new ArgumentNullException("Simulating task failure");
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 8856fca..d043ab1 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -124,6 +124,10 @@ under the License.
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />
+    <Compile Include="Functional\IMRU\TestFailUpdateEvaluator.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperTasks.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluators.cs" />
     <Compile Include="Functional\IMRU\TestTaskExceptions.cs" />
     <Compile Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" />
     <Compile Include="Functional\Messaging\TestMessageEventManager.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
index 4a8a048..572b245 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -162,8 +162,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             }
             catch (Exception e)
             {
-                Logger.Log(Level.Warning, "In Read function unable to read the message.");
-                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                Logger.Log(Level.Warning, "In StreamingLink::Read function unable to read the message {0}.", e.GetType());
                 throw;
             }
         }
@@ -186,8 +185,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             }
             catch (Exception e)
             {
-                Logger.Log(Level.Warning, "In ReadAsync function unable to read the message.");
-                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                Logger.Log(Level.Warning, "In StreamingLink::ReadAsync function unable to read the message, {0}.", e.GetType());
                 throw;
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
index 0ec4c8a..cca8abd 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -19,6 +19,7 @@ using System;
 using System.Net;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.AsyncUtils;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.StreamingCodec;
@@ -68,7 +69,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             : this(remoteEndpoint, streamingCodec, clientFactory)
         {
             _observer = observer;
-            Task.Factory.StartNew(() => ResponseLoop(), TaskCreationOptions.LongRunning);
+            try
+            {
+                Task.Factory.StartNew(() => ResponseLoop(), TaskCreationOptions.LongRunning);
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "StreamingTransportClient get exception from ResponseLoop: {0}.", e.GetType());
+                throw e;
+            }            
         }
 
         /// <summary>
@@ -111,16 +120,24 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         private async Task ResponseLoop()
         {
-            while (!_cancellationSource.IsCancellationRequested)
+            try
             {
-                T message = await _link.ReadAsync(_cancellationSource.Token);
-                if (message == null)
+                while (!_cancellationSource.IsCancellationRequested)
                 {
-                    break;
-                }
+                    T message = await _link.ReadAsync(_cancellationSource.Token);
+                    if (message == null)
+                    {
+                        break;
+                    }
 
-                TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
-                _observer.OnNext(transportEvent);
+                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+                    _observer.OnNext(transportEvent);
+                }
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "StreamingTransportClient get exception in ResponseLoop: {0}.", e.GetType());
+                throw e;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
index 706fac4..6a9ea1a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
@@ -162,7 +162,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 {
                     TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
                     ProcessClient(client).LogAndIgnoreExceptionIfAny(
-                        LOGGER, "Task Exception observed processing client in StreamingTransportServer.", Level.Warning);
+                        LOGGER,
+                        "StreamingTransportServer observed Task Exception during client processing.",
+                        Level.Warning);
                 }
             }
             catch (InvalidOperationException)
@@ -173,6 +175,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             {
                 LOGGER.Log(Level.Info, "StreamingTransportServer has been closed.");
             }
+            catch (Exception e)
+            {
+                LOGGER.Log(Level.Warning, "StreamingTransportServer got exception: {0}.", e.GetType());
+                throw e;
+            }
         }
 
         /// <summary>
@@ -181,24 +188,32 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="client">The connected client</param>
         private async Task ProcessClient(TcpClient client)
         {
-            // Keep reading messages from client until they disconnect or timeout
             CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec))
+            try
             {
-                while (!token.IsCancellationRequested)
+                // Keep reading messages from client until they disconnect or timeout
+                using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec))
                 {
-                    T message = await link.ReadAsync(token);
-
-                    if (message == null)
+                    while (!token.IsCancellationRequested)
                     {
-                        break;
-                    }
+                        T message = await link.ReadAsync(token);
 
-                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
-                    _remoteObserver.OnNext(transportEvent);
+                        if (message == null)
+                        {
+                            break;
+                        }
+
+                        TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+                        _remoteObserver.OnNext(transportEvent);
+                    }
+                    LOGGER.Log(Level.Error,
+                        "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
                 }
-                LOGGER.Log(Level.Error,
-                    "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
+            }
+            catch (Exception e)
+            {
+                LOGGER.Log(Level.Warning, "StreamingTransportServer get exception in ProcessClient: {0}, IsCancellationRequested {1}.", e.GetType(), token.IsCancellationRequested);
+                throw e;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
index 934983f..75087d5 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Wake.Remote.Parameters
 {
-    [NamedParameter("Number of retries for connecting to endpoint", defaultValue: "20")]
+    [NamedParameter("Number of retries for connecting to endpoint", defaultValue: "200")]
     public sealed class ConnectionRetryCount : Name<int>
     {
     }


[2/3] reef git commit: [REEF-1251] IMRU Driver handlers for fault tolerant

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 58b75ed..dafba71 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -1,4 +1,4 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -18,14 +18,15 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Globalization;
 using System.Linq;
-using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
 using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
@@ -33,60 +34,94 @@ using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
 using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 {
     /// <summary>
-    /// Implements the IMRU driver on REEF
+    /// Implements the IMRU driver on REEF with fault tolerant
     /// </summary>
     /// <typeparam name="TMapInput">Map Input</typeparam>
     /// <typeparam name="TMapOutput">Map output</typeparam>
     /// <typeparam name="TResult">Result</typeparam>
     /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam>
-    internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> 
-        : IObserver<IDriverStarted>,
+    internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> :
+        IObserver<IDriverStarted>,
         IObserver<IAllocatedEvaluator>,
         IObserver<IActiveContext>,
         IObserver<ICompletedTask>,
         IObserver<IFailedEvaluator>,
         IObserver<IFailedContext>,
-        IObserver<IFailedTask>
+        IObserver<IFailedTask>,
+        IObserver<IRunningTask>,
+        IObserver<IEnumerable<IActiveContext>>
     {
         private static readonly Logger Logger =
             Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
 
         private readonly ConfigurationManager _configurationManager;
         private readonly int _totalMappers;
-        private readonly IEvaluatorRequestor _evaluatorRequestor;
-        private ICommunicationGroupDriver _commGroup;
         private readonly IGroupCommDriver _groupCommDriver;
-        private readonly TaskStarter _groupCommTaskStarter;
-        private readonly ConcurrentStack<IConfiguration> _perMapperConfiguration;
-        private readonly int _coresPerMapper;
-        private readonly int _coresForUpdateTask;
-        private readonly int _memoryPerMapper;
-        private readonly int _memoryForUpdateTask;
+        private readonly INameServer _nameServer;
+        private ConcurrentStack<IConfiguration> _perMapperConfigurationStack;
         private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
-        private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>();
-        private readonly int _allowedFailedEvaluators;
-        private int _currentFailedEvaluators = 0;
         private readonly bool _invokeGC;
-        private int _numberOfReadyTasks = 0;
+        private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider;
 
-        private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
-            _serviceAndContextConfigurationProvider;
+        /// <summary>
+        /// The lock for the driver. 
+        /// </summary>
+        private readonly object _lock = new object();
+
+        /// <summary>
+        /// Manages Tasks, maintains task states and responsible for task submission for the driver.
+        /// </summary>
+        private TaskManager _taskManager;
+
+        /// <summary>
+        /// Manages Active Contexts for the driver.
+        /// </summary>
+        private readonly ActiveContextManager _contextManager;
+
+        /// <summary>
+        /// Manages allocated and failed Evaluators for driver.
+        /// </summary>
+        private readonly EvaluatorManager _evaluatorManager;
+
+        /// <summary>
+        /// Defines the max retry number for recoveries. It is configurable for the driver. 
+        /// </summary>
+        private readonly int _maxRetryNumberForFaultTolerant;
+
+        /// <summary>
+        /// System State of the driver. 
+        /// <see href="https://issues.apache.org/jira/browse/REEF-1223"></see> 
+        /// </summary>
+        private SystemStateMachine _systemState;
+
+        /// <summary>
+        /// Shows if the driver is first try. Once the system enters recovery, it is set to false. 
+        /// </summary>
+        private bool _isFirstTry = true;
+
+        /// <summary>
+        /// It records the number of retry for the recoveries. 
+        /// </summary>
+        private int _numberOfRetries;
+
+        private const int DefaultMaxNumberOfRetryInRecovery = 3; 
 
         [Inject]
         private IMRUDriver(IPartitionedInputDataSet dataSet,
@@ -98,181 +133,517 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
             [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
             [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction,
+            [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
             [Parameter(typeof(InvokeGC))] bool invokeGC,
-            IGroupCommDriver groupCommDriver)
+            IGroupCommDriver groupCommDriver,
+            INameServer nameServer)
         {
             _configurationManager = configurationManager;
-            _evaluatorRequestor = evaluatorRequestor;
             _groupCommDriver = groupCommDriver;
-            _coresPerMapper = coresPerMapper;
-            _coresForUpdateTask = coresForUpdateTask;
-            _memoryPerMapper = memoryPerMapper;
-            _memoryForUpdateTask = memoryForUpdateTask;
+            _nameServer = nameServer;
             _perMapperConfigs = perMapperConfigs;
             _totalMappers = dataSet.Count;
-
-            _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * dataSet.Count);
             _invokeGC = invokeGC;
+            _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery > 0 ? maxRetryNumberInRecovery : DefaultMaxNumberOfRetryInRecovery;
+
+            _contextManager = new ActiveContextManager(_totalMappers + 1);
+            _contextManager.Subscribe(this);
+
+            var updateSpec = new EvaluatorSpecification(memoryForUpdateTask, coresForUpdateTask);
+            var mapperSpec = new EvaluatorSpecification(memoryPerMapper, coresPerMapper);
+            var allowedFailedEvaluators = (int)(failedEvaluatorsFraction * _totalMappers);
+            _evaluatorManager = new EvaluatorManager(_totalMappers + 1, allowedFailedEvaluators, evaluatorRequestor, updateSpec, mapperSpec);
 
-            AddGroupCommunicationOperators();
-            _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalMappers + 1);
-            _perMapperConfiguration = ConstructPerMapperConfigStack(_totalMappers);
+            _systemState = new SystemStateMachine();
             _serviceAndContextConfigurationProvider =
                 new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet);
 
             var msg =
-                string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}",
-                    _memoryPerMapper,
-                    _memoryForUpdateTask,
-                    _coresPerMapper,
-                    _coresForUpdateTask);
+                string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.",
+                    memoryPerMapper,
+                    memoryForUpdateTask,
+                    coresPerMapper,
+                    coresForUpdateTask,
+                    _maxRetryNumberForFaultTolerant,
+                    allowedFailedEvaluators);
             Logger.Log(Level.Info, msg);
         }
 
+        #region IDriverStarted
         /// <summary>
-        /// Requests for evaluator for update task
+        /// Requests evaluators when driver starts
         /// </summary>
         /// <param name="value">Event fired when driver started</param>
         public void OnNext(IDriverStarted value)
         {
-            RequestUpdateEvaluator();
             //// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver.
+            _evaluatorManager.RequestUpdateEvaluator();
+            _evaluatorManager.RequestMapEvaluators(_totalMappers);
         }
+        #endregion IDriverStarted
 
+        #region IAllocatedEvaluator
         /// <summary>
-        /// Specifies context and service configuration for evaluator depending
-        /// on whether it is for Update function or for map function
+        /// IAllocatedEvaluator handler. It will take the following action based on the system state:
+        /// Case WaitingForEvaluator
+        ///    Add Evaluator to the Evaluator Manager
+        ///    submit Context and Services
+        /// Case Fail
+        ///    Do nothing. This is because the code that sets system Fail has executed FailedAction. It has shut down all the allocated evaluators/contexts. 
+        ///    If a new IAllocatedEvaluator comes after it, we should not submit anything so that the evaluator is returned.
+        /// Other cases - not expected
         /// </summary>
         /// <param name="allocatedEvaluator">The allocated evaluator</param>
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
-            var configs =
-                _serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id);
-            allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service);
+            Logger.Log(Level.Info, "AllocatedEvaluator EvaluatorBatchId [{0}], memory [{1}], systemState {2}.", allocatedEvaluator.EvaluatorBatchId, allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState);
+            lock (_lock)
+            {
+                using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator"))
+                {
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.WaitingForEvaluator:
+                            _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator);
+                            SubmitContextAndService(allocatedEvaluator);
+                            break;
+                        case SystemState.Fail:
+                            Logger.Log(Level.Info,
+                                "Receiving IAllocatedEvaluator event, but system is in FAIL state, ignore it.");
+                            allocatedEvaluator.Dispose();
+                            break;
+                        default:
+                            UnexpectedState(allocatedEvaluator.Id, "IAllocatedEvaluator");
+                            break;
+                    }
+                }
+            }
         }
 
         /// <summary>
-        /// Specifies the Map or Update task to run on the active context
+        /// Gets context and service configuration for evaluator depending
+        /// on whether it is for update/master function or for mapper function.
+        /// Then submits Context and Service with the corresponding configuration
         /// </summary>
-        /// <param name="activeContext"></param>
-        public void OnNext(IActiveContext activeContext)
+        /// <param name="allocatedEvaluator"></param>
+        private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator)
         {
-            Logger.Log(Level.Verbose, string.Format("Received Active Context {0}", activeContext.Id));
-
-            if (_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId))
+            ContextAndServiceConfiguration configs;
+            if (_evaluatorManager.IsEvaluatorForMaster(allocatedEvaluator))
             {
-                Logger.Log(Level.Verbose, "Submitting master task");
-                _commGroup.AddTask(IMRUConstants.UpdateTaskName);
-                _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), activeContext);
-                RequestMapEvaluators(_totalMappers);
+                configs =
+                    _serviceAndContextConfigurationProvider
+                        .GetContextConfigurationForMasterEvaluatorById(
+                            allocatedEvaluator.Id);
             }
             else
             {
-                Logger.Log(Level.Verbose, "Submitting map task");
-                _serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId);
-                string taskId = GetTaskIdByEvaluatorId(activeContext.EvaluatorId);
-                _commGroup.AddTask(taskId);
-                _groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), activeContext);
-                Interlocked.Increment(ref _numberOfReadyTasks);
-                Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready for submission", _numberOfReadyTasks));
+                configs = _serviceAndContextConfigurationProvider
+                    .GetDataLoadingConfigurationForEvaluatorById(
+                        allocatedEvaluator.Id);
             }
+            allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service);
         }
+        #endregion IAllocatedEvaluator
 
+        #region IActiveContext
         /// <summary>
-        /// Specifies what to do when the task is completed
-        /// In this case just disposes off the task
+        /// IActiveContext handler. It will take the following actions based on the system state:
+        /// Case WaitingForEvaluator:
+        ///    Adds Active Context to Active Context Manager
+        /// Case Fail:
+        ///    Closes the ActiveContext
+        /// Other cases - not expected
         /// </summary>
-        /// <param name="completedTask">The link to the completed task</param>
-        public void OnNext(ICompletedTask completedTask)
+        /// <param name="activeContext"></param>
+        public void OnNext(IActiveContext activeContext)
         {
-            lock (_completedTasks)
+            Logger.Log(Level.Info, "Received Active Context {0}, systemState {1}.", activeContext.Id, _systemState.CurrentState);
+            lock (_lock)
             {
-                Logger.Log(Level.Info,
-                    string.Format("Received completed task message from task Id: {0}", completedTask.Id));
-                _completedTasks.Add(completedTask);
-
-                if (AreIMRUTasksCompleted())
+                using (Logger.LogFunction("IMRUDriver::IActiveContext"))
                 {
-                    ShutDownAllEvaluators();
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.WaitingForEvaluator:
+                            _contextManager.Add(activeContext);
+                            break;
+                        case SystemState.Fail:
+                            Logger.Log(Level.Info,
+                                "Received IActiveContext event, but system is in FAIL state. Closing the context.");
+                            activeContext.Dispose();
+                            break;
+                        default:
+                            UnexpectedState(activeContext.Id, "IActiveContext");
+                            break;
+                    }
                 }
             }
         }
+        #endregion IActiveContext
 
+        #region submit tasks
         /// <summary>
-        /// Specifies what to do when evaluator fails.
-        /// If we get all completed tasks then ignore the failure
-        /// Else request a new evaluator. If failure happens in middle of IMRU 
-        /// job we expect neighboring evaluators to fail while doing 
-        /// communication and will use FailedTask and FailedContext logic to 
-        /// order shutdown.
+        /// Called from ActiveContextManager when all the expected active context are received.
+        /// It changes the system state then calls SubmitTasks().
         /// </summary>
         /// <param name="value"></param>
-        public void OnNext(IFailedEvaluator value)
+        public void OnNext(IEnumerable<IActiveContext> value)
         {
-            if (AreIMRUTasksCompleted())
+            Logger.Log(Level.Info, "Received event from ActiveContextManager with NumberOfActiveContexts:" + (value != null ? value.Count() : 0));
+            lock (_lock)
             {
-                Logger.Log(Level.Info,
-                    string.Format("Evaluator with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
-                return;
+                // When the event AllContextsAreReady happens, change the system state from WaitingForEvaluator to SubmittingTasks
+                _systemState.MoveNext(SystemStateEvent.AllContextsAreReady);
+                SubmitTasks(value);
             }
+        }
 
-            Logger.Log(Level.Info,
-                string.Format("Evaluator with Id: {0} failed with Exception: {1}", value.Id, value.EvaluatorException));
-            int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators);
-            if (currFailedEvaluators > _allowedFailedEvaluators)
+        /// <summary>
+        /// This method is responsible to prepare for the task submission then call SubmitTasks in TaskManager.
+        /// It is called in both first time and recovery scenarios.
+        /// Creates a new Communication Group and adds Group Communication Operators
+        /// For each context, adds a task to the communication group.
+        /// After all the tasks are added to the group, for each task, gets GroupCommTaskConfiguration from IGroupCommDriver 
+        /// and merges it with the task configuration.
+        /// When all the tasks are added, calls TaskManager to SubmitTasks().
+        /// </summary>
+        private void SubmitTasks(IEnumerable<IActiveContext> activeContexts)
+        {
+            Logger.Log(Level.Info, "SubmitTasks with system state : {0} at time: {1}.", _systemState.CurrentState, DateTime.Now);
+            using (Logger.LogFunction("IMRUDriver::SubmitTasksConfiguration"))
             {
-                Exceptions.Throw(new MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators),
-                    Logger);
-            }
+                if (!_isFirstTry)
+                {
+                    _groupCommDriver.RemoveCommunicationGroup(IMRUConstants.CommunicationGroupName);
+                }
 
-            _serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id);
-            bool isMaster = _serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id);
+                UpdateMaterTaskId();
+                _taskManager = new TaskManager(_totalMappers + 1, _groupCommDriver.MasterTaskId);
+                var commGroup = AddCommunicationGroupWithOperators();
+                _perMapperConfigurationStack = ConstructPerMapperConfigStack(_totalMappers);
+
+                var taskIdAndContextMapping = new Dictionary<string, IActiveContext>();
+                foreach (var activeContext in activeContexts)
+                {
+                    var taskId = _evaluatorManager.IsMasterEvaluatorId(activeContext.EvaluatorId)
+                        ? _groupCommDriver.MasterTaskId
+                        : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId);
+                    commGroup.AddTask(taskId);
+                    taskIdAndContextMapping.Add(taskId, activeContext);
+                }
+
+                foreach (var mapping in taskIdAndContextMapping)
+                {
+                    var taskConfig = _evaluatorManager.IsMasterEvaluatorId(mapping.Value.EvaluatorId)
+                        ? GetMasterTaskConfiguration(mapping.Key)
+                        : GetMapperTaskConfiguration(mapping.Value, mapping.Key);
+                    var groupCommTaskConfiguration = _groupCommDriver.GetGroupCommTaskConfiguration(mapping.Key);
+                    var mergedTaskConf = Configurations.Merge(taskConfig, groupCommTaskConfiguration);
+                    _taskManager.AddTask(mapping.Key, mergedTaskConf, mapping.Value);
+                }
+            }
+            _taskManager.SubmitTasks();
+        }
 
-            // If failed evaluator is master then ask for master 
-            // evaluator else ask for mapper evaluator
-            if (!isMaster)
+        private void UpdateMaterTaskId()
+        {
+            if (_isFirstTry)
             {
-                Logger.Log(Level.Info, string.Format("Requesting a replacement map Evaluator for {0}", value.Id));
-                RequestMapEvaluators(1);
+                _groupCommDriver.MasterTaskId = _groupCommDriver.MasterTaskId + "-" + _numberOfRetries;
             }
             else
             {
-                Logger.Log(Level.Info, string.Format("Requesting a replacement master Evaluator for {0}", value.Id));
-                RequestUpdateEvaluator();
+                _groupCommDriver.MasterTaskId =
+                    _groupCommDriver.MasterTaskId.Substring(0, _groupCommDriver.MasterTaskId.Length - 1) +
+                    _numberOfRetries;
             }
         }
+        #endregion submit tasks
 
+        #region IRunningTask
         /// <summary>
-        /// Specifies what to do if Failed Context is received.
-        /// An exception is thrown if tasks are not completed.
+        /// IRunningTask handler. The method is called when a task is running. The following action will be taken based on the system state:
+        /// Case SubmittingTasks
+        ///     Add it to RunningTasks and set task state to TaskRunning
+        ///     When all the tasks are running, change system state to TasksRunning
+        /// Case ShuttingDown/Fail
+        ///     Call TaskManager to record RunningTask during SystemFailure
+        /// Other cases - not expected 
         /// </summary>
-        /// <param name="value"></param>
-        public void OnNext(IFailedContext value)
+        /// <param name="runningTask"></param>
+        public void OnNext(IRunningTask runningTask)
         {
-            if (AreIMRUTasksCompleted())
+            Logger.Log(Level.Info, "Received IRunningTask {0} from endpoint {1} at SystemState {2} retry # {3}.", runningTask.Id, GetEndPointFromTaskId(runningTask.Id), _systemState.CurrentState, _numberOfRetries);
+            lock (_lock)
             {
-                Logger.Log(Level.Info,
-                    string.Format("Context with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
-                return;
+                using (Logger.LogFunction("IMRUDriver::IRunningTask"))
+                {
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.SubmittingTasks:
+                            _taskManager.RecordRunningTask(runningTask);
+                            if (_taskManager.AreAllTasksRunning())
+                            {
+                                _systemState.MoveNext(SystemStateEvent.AllTasksAreRunning);
+                                Logger.Log(Level.Info,
+                                    "All tasks are running, SystemState {0}",
+                                    _systemState.CurrentState);
+                            }
+                            break;
+                        case SystemState.ShuttingDown:
+                        case SystemState.Fail:
+                            _taskManager.RecordRunningTaskDuringSystemFailure(runningTask, TaskManager.CloseTaskByDriver);
+                            break;
+                        default:
+                            UnexpectedState(runningTask.Id, "IRunningTask");
+                            break;
+                    }
+                }
             }
-            Exceptions.Throw(new Exception(string.Format("Data Loading Context with Id: {0} failed", value.Id)), Logger);
         }
+        #endregion IRunningTask
 
+        #region ICompletedTask
         /// <summary>
-        /// Specifies what to do if a task fails.
-        /// We throw the exception and fail IMRU unless IMRU job is already done.
+        /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State:
+        /// Case TasksRunning
+        ///     Updates task state to TaskCompleted
+        ///     If all tasks are completed, sets system state to TasksCompleted and then go to Done action
+        /// Case ShuttingDown
+        ///     Updates task state to TaskCompleted
+        ///     Try to recover
+        /// Other cases - not expected 
         /// </summary>
-        /// <param name="value"></param>
-        public void OnNext(IFailedTask value)
+        /// <param name="completedTask">The link to the completed task</param>
+        public void OnNext(ICompletedTask completedTask)
         {
-            if (AreIMRUTasksCompleted())
+            Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries);
+            lock (_lock)
             {
-                Logger.Log(Level.Info,
-                    string.Format("Task with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
-                return;
+                switch (_systemState.CurrentState)
+                {
+                    case SystemState.TasksRunning:
+                        _taskManager.RecordCompletedTask(completedTask);
+                        if (_taskManager.AreAllTasksCompleted())
+                        {
+                            _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
+                            Logger.Log(Level.Info, "All tasks are completed, systemState {0}", _systemState.CurrentState);
+                            DoneAction();
+                        }
+                        break;
+                    case SystemState.ShuttingDown:
+                        // The task might be in running state or waiting for close, record the completed task
+                        _taskManager.RecordCompletedTask(completedTask);
+                        TryRecovery();
+                        break;
+                    default:
+                        UnexpectedState(completedTask.Id, "ICompletedTask");
+                        break;
+                }
+            }
+        }
+        #endregion ICompletedTask
+
+        #region IFailedEvaluator
+        /// <summary>
+        /// IFailedEvaluator handler. It specifies what to do when an evaluator fails.
+        /// If we get all completed tasks then ignore the failure. Otherwise, take the following actions based on the system state: 
+        /// Case WaitingForEvaluator
+        ///     This happens in the middle of submitting contexts. We just need to remove the failed evaluator 
+        ///     from EvaluatorManager and remove associated active context, if any, from ActiveContextManager
+        ///     then checks if the system is recoverable. If yes, request another Evaluator 
+        ///     If not recoverable, set system state to Fail then execute Fail action
+        /// Case SubmittingTasks/TasksRunning
+        ///     This happens either in the middle of Task submitting or all the tasks are running
+        ///     Changes the system state to ShuttingDown
+        ///     Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
+        ///     Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure
+        ///     Closes all the other running tasks
+        ///     Try to recover in case it is the last failure received
+        /// Case ShuttingDown
+        ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
+        ///     Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
+        ///     Removes associated task from running task if it was running, changes the task state to ClosedTask if it was waiting for close
+        ///     otherwise changes the task state to FailedTaskEvaluatorError
+        ///     Try to recover in case it is the last failure received
+        /// Other cases - not expected 
+        /// </summary>
+        /// <param name="failedEvaluator"></param>
+        public void OnNext(IFailedEvaluator failedEvaluator)
+        {
+            var endpoint = failedEvaluator.FailedTask.IsPresent()
+               ? GetEndPoint(failedEvaluator.FailedTask.Value)
+               : failedEvaluator.FailedContexts.Any()
+                   ? GetEndPointFromContext(failedEvaluator.FailedContexts.First())
+                   : "unknown_endpoint";
+
+            Logger.Log(Level.Warning, "Received IFailedEvaluator {0} from endpoint {1} with systemState {2} in retry# {3} with Exception: {4}.", failedEvaluator.Id, endpoint, _systemState.CurrentState, _numberOfRetries, failedEvaluator.EvaluatorException);
+
+            lock (_lock)
+            {
+                using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
+                {
+                    if (_taskManager != null && _taskManager.AreAllTasksCompleted())
+                    {
+                        Logger.Log(Level.Verbose,
+                            "All IMRU tasks have been completed. So ignoring the Evaluator {0} failure.",
+                            failedEvaluator.Id);
+                        return;
+                    }
+
+                    var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
+                    _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
+                    _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
+
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.WaitingForEvaluator:
+                            if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures())
+                            {
+                                if (isMaster)
+                                {
+                                    Logger.Log(Level.Info, "Requesting a master Evaluator.");
+                                    _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
+                                    _evaluatorManager.RequestUpdateEvaluator();
+                                }
+                                else
+                                {
+                                    _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+                                        failedEvaluator.Id);
+                                    Logger.Log(Level.Info, "Requesting mapper Evaluators.");
+                                    _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
+                                    _evaluatorManager.RequestMapEvaluators(1);
+                                }
+                            }
+                            else
+                            {
+                                Logger.Log(Level.Error, "The system is not recoverable, change the state to Fail.");
+                                _systemState.MoveNext(SystemStateEvent.NotRecoverable);
+                                FailAction();
+                            }
+                            break;
+
+                        case SystemState.SubmittingTasks:
+                        case SystemState.TasksRunning:
+                            // When the event FailedNode happens, change the system state to ShuttingDown
+                            _systemState.MoveNext(SystemStateEvent.FailedNode);
+                            _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+                            _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+                            // Push evaluator id back to PartitionIdProvider if it is not master
+                            if (!isMaster)
+                            {
+                                _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+                                    failedEvaluator.Id);
+                            }
+
+                            TryRecovery();
+                            break;
+
+                        case SystemState.ShuttingDown:
+                            _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+
+                            // Push evaluator id back to PartitionIdProvider if it is not master
+                            if (!isMaster)
+                            {
+                                _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+                                    failedEvaluator.Id);
+                            }
+                            TryRecovery();
+                            break;
+
+                        case SystemState.Fail:
+                            break;
+
+                        default:
+                            UnexpectedState(failedEvaluator.Id, "IFailedEvaluator");
+                            break;
+                    }
+                }
+            }
+        }
+        #endregion IFailedEvaluator
+
+        #region IFailedContext
+        /// <summary>
+        /// IFailedContext handler. It specifies what to do if Failed Context is received.
+        /// If we get all completed tasks then ignore the failure otherwise throw exception
+        /// Fault tolerant would be similar to FailedEvaluator.
+        /// </summary>
+        /// <param name="failedContext"></param>
+        public void OnNext(IFailedContext failedContext)
+        {
+            lock (_lock)
+            {
+                if (_taskManager.AreAllTasksCompleted())
+                {
+                    Logger.Log(Level.Info, "Context with Id: {0} failed but IMRU tasks are completed. So ignoring.", failedContext.Id);
+                    return;
+                }
+
+                var msg = string.Format("Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
+                Exceptions.Throw(new Exception(msg), Logger);
+            }
+        }
+        #endregion IFailedContext
+
+        #region IFailedTask
+        /// <summary>
+        /// IFailedTask handler. It specifies what to do when task fails.
+        /// If we get all completed tasks then ignore the failure. Otherwise take the following actions based on the System state:
+        /// Case SubmittingTasks/TasksRunning
+        ///     This is the first failure received
+        ///     Changes the system state to ShuttingDown
+        ///     Record failed task in TaskManager
+        ///     Closes all the other running tasks and set their state to TaskWaitingForClose
+        ///     Try to recover
+        /// Case ShuttingDown
+        ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
+        ///     Record failed task in TaskManager.
+        ///     Try to recover
+        /// Other cases - not expected 
+        /// </summary>
+        /// <param name="failedTask"></param>
+        public void OnNext(IFailedTask failedTask)
+        {
+            Logger.Log(Level.Warning, "Received IFailedTask with Id: {0} and message: {1} from endpoint {2} with systemState {3} in retry#: {4}.", failedTask.Id, failedTask.Message, GetEndPointFromContext(failedTask.GetActiveContext()), _systemState.CurrentState, _numberOfRetries);
+            lock (_lock)
+            {
+                using (Logger.LogFunction("IMRUDriver::IFailedTask"))
+                {
+                    if (_taskManager.AreAllTasksCompleted())
+                    {
+                        Logger.Log(Level.Info,
+                            "Task with Id: {0} failed but all IMRU tasks are completed. So ignoring.",
+                            failedTask.Id);
+                        return;
+                    }
+
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.SubmittingTasks:
+                        case SystemState.TasksRunning:
+                            // When the event FailedNode happens, change the system state to ShuttingDown
+                            _systemState.MoveNext(SystemStateEvent.FailedNode);
+                            _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
+                            _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+                            TryRecovery();
+                            break;
+
+                        case SystemState.ShuttingDown:
+                            _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask);
+                            TryRecovery();
+                            break;
+
+                        default:
+                            UnexpectedState(failedTask.Id, "IFailedTask");
+                            break;
+                    }
+                }
             }
-            Exceptions.Throw(new Exception(string.Format("Task with Id: {0} failed", value.Id)), Logger);
         }
+        #endregion IFailedTask
 
         public void OnError(Exception error)
         {
@@ -282,42 +653,154 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
         }
 
-        private bool AreIMRUTasksCompleted()
+        private void UnexpectedState(string id, string eventName)
         {
-            return _completedTasks.Count >= _totalMappers + 1;
+            var msg = string.Format(CultureInfo.InvariantCulture,
+                "Received {0} for [{1}], but system status is {2}.",
+                eventName,
+                id,
+                _systemState.CurrentState);
+            Exceptions.Throw(new IMRUSystemException(msg), Logger);
         }
 
-        private string GetTaskIdByEvaluatorId(string evaluatorId)
+        /// <summary>
+        /// If all the tasks are in final state, if the system is recoverable, start recovery
+        /// else, change the system state to Fail then take Fail action
+        /// </summary>
+        private void TryRecovery()
         {
-            return string.Format("{0}-{1}-Version0",
+            if (_taskManager.AreAllTasksInFinalState())
+            {
+                if (IsRecoverable())
+                {
+                    _isFirstTry = false;
+                    RecoveryAction();
+                }
+                else
+                {
+                    Logger.Log(Level.Warning, "The system is not recoverable, change the state to Fail.");
+                    _systemState.MoveNext(SystemStateEvent.NotRecoverable);
+                    FailAction();
+                }
+            }
+        }
+
+        private string GetMapperTaskIdByEvaluatorId(string evaluatorId)
+        {
+            return string.Format("{0}-{1}-{2}",
                 IMRUConstants.MapTaskPrefix,
-                _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId));
+                _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId),
+                _numberOfRetries);
+        }
+
+        /// <summary>
+        /// This method is called when all the tasks are successfully completed. 
+        /// </summary>
+        private void DoneAction()
+        {
+            ShutDownAllEvaluators();
+            Logger.Log(Level.Info, "DoneAction done in retry {0}!!!", _numberOfRetries);
+        }
+
+        /// <summary>
+        /// This method is called when there are failures and the system is not recoverable. 
+        /// </summary>
+        private void FailAction()
+        {
+            ShutDownAllEvaluators();
+            var msg = string.Format(CultureInfo.InvariantCulture,
+                "The system cannot be recovered after {0} retries. NumberofFailedMappers in the last try is {1}.",
+                _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
+            Exceptions.Throw(new ApplicationException(msg), Logger);
         }
 
         /// <summary>
-        /// Shuts down evaluators once all completed task messages are received
+        /// Shuts down evaluators
         /// </summary>
         private void ShutDownAllEvaluators()
         {
-            foreach (var task in _completedTasks)
+            foreach (var context in _contextManager.ActiveContexts)
             {
-                Logger.Log(Level.Info, string.Format("Disposing task: {0}", task.Id));
-                task.ActiveContext.Dispose();
+                Logger.Log(Level.Verbose, "Disposing active context: {0}", context.Id);
+                context.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// This method is called for recovery. It resets Failed Evaluators and changes state to WaitingForEvaluator
+        /// If there is no failed mappers, meaning the recovery is caused by failed tasks, resubmit all the tasks. 
+        /// Else, based on the number of failed evaluators, requests missing map evaluators
+        /// </summary>
+        private void RecoveryAction()
+        {
+            lock (_lock)
+            {
+                _numberOfRetries++;
+                var msg = string.Format(CultureInfo.InvariantCulture,
+                    "Start recovery with _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}.",
+                    _numberOfRetries,
+                    _evaluatorManager.NumberofFailedMappers());
+                Logger.Log(Level.Info, msg);
+
+                _systemState.MoveNext(SystemStateEvent.Recover);
+
+                var mappersToRequest = _evaluatorManager.NumberofFailedMappers();
+                _evaluatorManager.ResetFailedEvaluators();
+
+                if (mappersToRequest == 0)
+                {
+                    Logger.Log(Level.Info, "There is no failed Evaluator in this recovery but failed tasks.");
+                    if (_contextManager.AreAllContextsReceived)
+                    {
+                        OnNext(_contextManager.ActiveContexts);
+                    }
+                    else
+                    {
+                        Exceptions.Throw(new IMRUSystemException("In recovery, there are no Failed evaluators but not all the contexts are received"), Logger);
+                    }
+                }
+                else
+                {
+                    Logger.Log(Level.Info, "Requesting {0} map Evaluators.", mappersToRequest);
+                    _evaluatorManager.RequestMapEvaluators(mappersToRequest);
+                }
             }
         }
 
         /// <summary>
-        /// Generates map task configuration given the active context. 
+        /// Checks if the system is recoverable.
+        /// </summary>
+        /// <returns></returns>
+        private bool IsRecoverable()
+        {
+            var msg = string.Format(CultureInfo.InvariantCulture,
+                "IsRecoverable: _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}, NumberOfAppErrors {2}, IsMasterEvaluatorFailed {3} AllowedNumberOfEvaluatorFailures {4}, _maxRetryNumberForFaultTolerant {5}.",
+                _numberOfRetries,
+                _evaluatorManager.NumberofFailedMappers(),
+                _taskManager.NumberOfAppErrors(),
+                _evaluatorManager.IsMasterEvaluatorFailed(),
+                _evaluatorManager.AllowedNumberOfEvaluatorFailures,
+                _maxRetryNumberForFaultTolerant);
+            Logger.Log(Level.Info, msg);
+
+            return !_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
+                && _taskManager.NumberOfAppErrors() == 0
+                && !_evaluatorManager.IsMasterEvaluatorFailed()
+                && _numberOfRetries < _maxRetryNumberForFaultTolerant;
+        }
+
+        /// <summary>
+        /// Generates map task configuration given the active context. S
         /// Merge configurations of all the inputs to the MapTaskHost.
         /// </summary>
         /// <param name="activeContext">Active context to which task needs to be submitted</param>
         /// <param name="taskId">Task Id</param>
         /// <returns>Map task configuration</returns>
-        private IConfiguration GetMapTaskConfiguration(IActiveContext activeContext, string taskId)
+        private IConfiguration GetMapperTaskConfiguration(IActiveContext activeContext, string taskId)
         {
             IConfiguration mapSpecificConfig;
 
-            if (!_perMapperConfiguration.TryPop(out mapSpecificConfig))
+            if (!_perMapperConfigurationStack.TryPop(out mapSpecificConfig))
             {
                 Exceptions.Throw(
                     new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}",
@@ -343,13 +826,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Merge configurations of all the inputs to the UpdateTaskHost.
         /// </summary>
         /// <returns>Update task configuration</returns>
-        private IConfiguration GetUpdateTaskConfiguration()
+        private IConfiguration GetMasterTaskConfiguration(string taskId)
         {
             var partialTaskConf =
                 TangFactory.GetTang()
                     .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
                         .Set(TaskConfiguration.Identifier,
-                            IMRUConstants.UpdateTaskName)
+                            taskId)
                         .Set(TaskConfiguration.Task,
                             GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class)
                         .Set(TaskConfiguration.OnClose,
@@ -384,8 +867,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Generate the group communicaiton configuration to be added 
-        /// to the tasks
+        /// Creates the group communication configuration to be added to the tasks
         /// </summary>
         /// <returns>The group communication configuration</returns>
         private IConfiguration GetGroupCommConfiguration()
@@ -406,12 +888,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Adds broadcast and reduce operators to the default communication group
         /// </summary>
-        private void AddGroupCommunicationOperators()
+        private ICommunicationGroupDriver AddCommunicationGroupWithOperators()
         {
             var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration;
             var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration;
             var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration;
 
+            // TODO check the specific exception type 
             try
             {
                 TangFactory.GetTang()
@@ -452,25 +935,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                         .Build();
             }
 
-            _commGroup =
-                _groupCommDriver.DefaultGroup
+            var commGroup =
+                _groupCommDriver.NewCommunicationGroup(IMRUConstants.CommunicationGroupName, _totalMappers + 1)
                     .AddBroadcast<MapInputWithControlMessage<TMapInput>>(
                         IMRUConstants.BroadcastOperatorName,
-                        IMRUConstants.UpdateTaskName,
+                        _groupCommDriver.MasterTaskId,
                         TopologyTypes.Tree,
                         mapInputPipelineDataConverterConfig)
                     .AddReduce<TMapOutput>(
                         IMRUConstants.ReduceOperatorName,
-                        IMRUConstants.UpdateTaskName,
+                        _groupCommDriver.MasterTaskId,
                         TopologyTypes.Tree,
                         reduceFunctionConfig,
                         mapOutputPipelineDataConverterConfig)
                     .Build();
+
+            return commGroup;
         }
 
         /// <summary>
-        /// Construct the stack of map configuraion which 
-        /// is specific to each mapper. If user does not 
+        /// Construct the stack of map configuration which is specific to each mapper. If user does not 
         /// specify any then its empty configuration
         /// </summary>
         /// <param name="totalMappers">Total mappers</param>
@@ -490,30 +974,47 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Request map evaluators from resource manager
+        /// look up endpoint for given id
         /// </summary>
-        /// <param name="numEvaluators">Number of evaluators to request</param>
-        private void RequestMapEvaluators(int numEvaluators)
+        /// <param name="taskId">Registered identifier in name server></param>
+        /// <returns></returns>
+        private string GetEndPointFromTaskId(string taskId)
         {
-            _evaluatorRequestor.Submit(
-                _evaluatorRequestor.NewBuilder()
-                    .SetMegabytes(_memoryPerMapper)
-                    .SetNumber(numEvaluators)
-                    .SetCores(_coresPerMapper)
-                    .Build());
+            List<string> t = new List<string>();
+            t.Add(taskId);
+            var ips = _nameServer.Lookup(t);
+            if (ips.Count > 0)
+            {
+                var ip = ips.FirstOrDefault();
+                if (ip != null)
+                {
+                    return ip.Endpoint.ToString();
+                }
+            }
+            return null;
         }
 
-        /// <summary>
-        /// Request update/master evaluator from resource manager
-        /// </summary>
-        private void RequestUpdateEvaluator()
-        {
-            _evaluatorRequestor.Submit(
-                _evaluatorRequestor.NewBuilder()
-                    .SetCores(_coresForUpdateTask)
-                    .SetMegabytes(_memoryForUpdateTask)
-                    .SetNumber(1)
-                    .Build());
+        private string GetEndPoint(IFailedTask failedTask)
+        { 
+            return GetEndPointFromTaskId(failedTask.Id) ?? GetEndPointFromContext(failedTask.GetActiveContext()); 
+        }
+
+        private string GetEndPointFromContext(IFailedContext context)
+        { 
+            if (context == null || context.EvaluatorDescriptor == null || context.EvaluatorDescriptor.NodeDescriptor == null) 
+            { 
+                return null; 
+            } 
+            return context.EvaluatorDescriptor.NodeDescriptor.HostName; 
+        } 
+ 
+        private string GetEndPointFromContext(Optional<IActiveContext> context)
+        { 
+            if (!context.IsPresent() || context.Value == null || context.Value.EvaluatorDescriptor == null || context.Value.EvaluatorDescriptor.NodeDescriptor == null) 
+            { 
+                return null; 
+            } 
+            return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName; 
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 36916db..24a2b9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -17,36 +17,38 @@
 
 using System;
 using System.Collections.Generic;
+using System.Globalization;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Events;
 using Org.Apache.REEF.Common.Services;
 using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 {
     /// <summary>
-    /// Class that handles failed evaluators and also provides Service 
-    /// and Context configuration
+    /// Class that provides Service and Context configuration
     /// </summary>
     /// <typeparam name="TMapInput"></typeparam>
     /// <typeparam name="TMapOutput"></typeparam>
     /// <typeparam name="TPartitionType"></typeparam>
+    [NotThreadSafe]
     internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
 
         private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
-        private readonly ISet<string> _submittedEvaluators = new HashSet<string>();
-        private readonly ISet<string> _contextLoadedEvaluators = new HashSet<string>(); 
-        private readonly object _lock = new object();
         private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
         private readonly IPartitionedInputDataSet _dataset;
-        private string _masterEvaluatorId;
 
+        /// <summary>
+        /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration 
+        /// </summary>
+        /// <param name="dataset"></param>
         internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
         {
             _dataset = dataset;
@@ -57,120 +59,34 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Handles failed evaluator. Moves the id from 
-        /// submitted evaluator to failed evaluator
+        /// Handles failed evaluator. Push the partitionId back to Partition Descriptor Id stack and 
+        /// remove evaluatorId from Partition Id Provider collection
         /// </summary>
         /// <param name="evaluatorId"></param>
         /// <returns>Whether failed evaluator is master or not</returns>
-        internal bool RecordEvaluatorFailureById(string evaluatorId)
+        internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId)
         {
-            lock (_lock)
+            if (!_partitionIdProvider.ContainsKey(evaluatorId))
             {
-                string msg;
-                bool isMaster = IsMasterEvaluatorId(evaluatorId);
-
-                if (_contextLoadedEvaluators.Contains(evaluatorId))
-                {
-                    msg =
-                        string.Format(
-                            "Failed evaluator:{0} already had context loaded. Cannot handle failure at this stage",
-                            evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                if (!_submittedEvaluators.Contains(evaluatorId))
-                {
-                    msg = string.Format("Failed evaluator:{0} was never submitted", evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                if (!_partitionIdProvider.ContainsKey(evaluatorId) && !isMaster)
-                {
-                    msg = string.Format("Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                _submittedEvaluators.Remove(evaluatorId);
-
-                if (isMaster)
-                {
-                    Logger.Log(Level.Info, "Failed Evaluator is Master");
-                    _masterEvaluatorId = null;
-                    return true;
-                }
-                
-                Logger.Log(Level.Info, "Failed Evaluator is a Mapper");
-                _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
-                _partitionIdProvider.Remove(evaluatorId);
-                return false;
+                var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
+                Exceptions.Throw(new Exception(msg), Logger);
             }
+            _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
+            _partitionIdProvider.Remove(evaluatorId);
         }
 
         /// <summary>
-        /// Notifies that active context state has been reached
+        /// Gets Context and Service configuration for Master
         /// </summary>
         /// <param name="evaluatorId"></param>
-        internal void RecordActiveContextPerEvaluatorId(string evaluatorId)
-        {
-            lock (_lock)
-            {
-                if (!_submittedEvaluators.Contains(evaluatorId))
-                {
-                    var msg = string.Format("Evaluator:{0} never loaded data but still reached active context stage",
-                        evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                if (_contextLoadedEvaluators.Contains(evaluatorId))
-                {
-                    var msg = string.Format("Evaluator:{0} already reached the active context stage", evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                _contextLoadedEvaluators.Add(evaluatorId);
-                _submittedEvaluators.Remove(evaluatorId);
-            }
-        }
-
-        /// <summary>
-        /// Gets next context configuration. Either master or mapper.
-        /// </summary>
-        /// <param name="evaluatorId">Evaluator Id</param>
-        /// <returns>The context and service configuration</returns>
-        internal ContextAndServiceConfiguration GetContextConfigurationForEvaluatorById(string evaluatorId)
-        {
-            lock (_lock)
-            {
-                if (_submittedEvaluators.Contains(evaluatorId))
-                {
-                    string msg = string.Format("The context is already submitted to evaluator:{0}", evaluatorId);
-                    Exceptions.Throw(new Exception(msg), Logger);
-                }
-
-                if (_masterEvaluatorId == null)
-                {
-                    Logger.Log(Level.Info, "Submitting root context and service for master");
-                    _masterEvaluatorId = evaluatorId;
-                    _submittedEvaluators.Add(evaluatorId);
-                    return new ContextAndServiceConfiguration(
-                        ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
-                            IMRUConstants.MasterContextId).Build(),
-                        TangFactory.GetTang().NewConfigurationBuilder().Build());
-                }
-
-                Logger.Log(Level.Info, "Submitting root context and service for a map task");
-                return GetDataLoadingConfigurationForEvaluatorById(evaluatorId);
-            }
-        }
-
-        /// <summary>
-        /// Checks whether evaluator id is that of master
-        /// </summary>
-        /// <param name="evaluatorId">Id of evaluator</param>
-        /// <returns>true if id is that of master</returns>
-        internal bool IsMasterEvaluatorId(string evaluatorId)
+        /// <returns></returns>
+        internal ContextAndServiceConfiguration GetContextConfigurationForMasterEvaluatorById(string evaluatorId)
         {
-            return evaluatorId.Equals(_masterEvaluatorId);
+            Logger.Log(Level.Info, "Getting root context and service configuration for master");
+            return new ContextAndServiceConfiguration(
+                ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
+                    IMRUConstants.MasterContextId).Build(),
+                TangFactory.GetTang().NewConfigurationBuilder().Build());
         }
 
         /// <summary>
@@ -180,29 +96,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <returns></returns>
         internal string GetPartitionIdByEvaluatorId(string evaluatorId)
         {
-            lock (_lock)
+            if (!_partitionIdProvider.ContainsKey(evaluatorId))
             {
-                string msg;
-                if (!_submittedEvaluators.Contains(evaluatorId) && !_contextLoadedEvaluators.Contains(evaluatorId))
-                {
-                    msg = string.Format("Context for Evaluator:{0} has never been submitted", evaluatorId);
-                    Exceptions.Throw(new IMRUSystemException(msg), Logger);
-                }
-
-                if (IsMasterEvaluatorId(evaluatorId))
-                {
-                    msg = string.Format("Evaluator:{0} is master and does not get partition", evaluatorId);
-                    Exceptions.Throw(new IMRUSystemException(msg), Logger);
-                }
-
-                if (!_partitionIdProvider.ContainsKey(evaluatorId))
-                {
-                    msg = string.Format("Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
-                    Exceptions.Throw(new IMRUSystemException(msg), Logger);   
-                }
-
-                return _partitionIdProvider[evaluatorId];
+                var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
             }
+
+            return _partitionIdProvider[evaluatorId];
         }
 
         /// <summary>
@@ -211,16 +111,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         /// <param name="evaluatorId"></param>
         /// <returns></returns>
-        private ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
+        internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
         {
-            string msg;
-           
-            if (_contextLoadedEvaluators.Contains(evaluatorId))
-            {
-                msg = string.Format("Evaluator:{0} already has the data loaded", evaluatorId);
-                Exceptions.Throw(new IMRUSystemException(msg), Logger);
-            }
-
             if (_partitionDescriptorIds.Count == 0)
             {
                 Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
@@ -228,8 +120,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             if (_partitionIdProvider.ContainsKey(evaluatorId))
             {
-                msg =
+                var msg =
                     string.Format(
+                        CultureInfo.InvariantCulture,
                         "Evaluator Id:{0} already present in configuration cache, they have to be unique",
                         evaluatorId);
                 Exceptions.Throw(new IMRUSystemException(msg), Logger);
@@ -237,14 +130,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             Logger.Log(Level.Info, "Getting a new data loading configuration");
             _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
-            _submittedEvaluators.Add(evaluatorId);
-
-            msg = string.Format(
-                "Current status: Submitted Evaluators-{0}, Data Loaded Evaluators-{1}, Unused data partitions-{2}",
-                _submittedEvaluators.Count,
-                _contextLoadedEvaluators.Count,
-                _partitionDescriptorIds.Count);
-            Logger.Log(Level.Info, msg);
 
             try
             {
@@ -254,14 +139,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             }
             catch (Exception e)
             {
-                msg = string.Format("Error while trying to access partition descriptor:{0} from dataset",
+                var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset",
                     _partitionIdProvider[evaluatorId]);
                 Exceptions.Throw(e, msg, Logger);
                 return null;
             }
         }
 
-        private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
+        /// <summary>
+        /// Creates service and data loading context configuration for given evaluator id
+        /// </summary>
+        /// <param name="partitionDescriptor"></param>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        private static ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
             IPartitionDescriptor partitionDescriptor,
             string evaluatorId)
         {
@@ -286,4 +177,4 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             return new ContextAndServiceConfiguration(contextConf, serviceConf);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index 3bf6d75..a37fa3b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -1,4 +1,4 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -15,15 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
 using System.Text;
+using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
 using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -202,7 +205,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// This method is called when receiving ICompletedTask event during task running or system shutting down.
         /// Removes the task from running tasks if it was running
-        /// Changes the task state from RunningTask to CompletedTask
+        /// Changes the task state from RunningTask to CompletedTask if the task was running
+        /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
         /// </summary>
         /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
@@ -233,6 +237,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="failedTask"></param>
         internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
         {
+            Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString());
+
             var taskState = GetTaskState(failedTask.Id);
             if (taskState == StateMachine.TaskState.TaskWaitingForClose)
             {
@@ -260,7 +266,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 {
                     if (!_runningTasks.ContainsKey(taskId))
                     {
-                        var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId);
+                        var msg = string.Format(CultureInfo.InvariantCulture,
+                            "The task [{0}] doesn't exist in Running Tasks.",
+                            taskId);
                         Exceptions.Throw(new IMRUSystemException(msg), Logger);
                     }
                     _runningTasks.Remove(taskId);
@@ -268,6 +276,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
                 UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
             }
+            else
+            {
+                var taskId = FindTaskAssociatedWithTheEvalutor(failedEvaluator.Id);
+                var taskState = GetTaskState(taskId);
+                if (taskState == StateMachine.TaskState.TaskSubmitted)
+                {
+                    UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+                }
+            }
+        }
+
+        private string FindTaskAssociatedWithTheEvalutor(string evaluatorId)
+        {
+            return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault();
         }
 
         /// <summary>
@@ -346,13 +368,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Gets error type based on the exception type in IFailedTask 
+        /// Gets error type (encoded as TaskStateEvent) based on the exception type in IFailedTask.
+        /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions
+        /// treat then as application error.
         /// </summary>
         /// <param name="failedTask"></param>
         /// <returns></returns>
         private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask)
         {
             var exception = failedTask.AsError();
+            var innerExceptionType = exception.InnerException != null ? exception.InnerException.GetType().ToString() : "InnerException null";
+            var innerExceptionMsg = exception.InnerException != null ? exception.InnerException.Message : "No InnerException";
+
+
+            if (failedTask.GetActiveContext().IsPresent())
+            {
+                Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}, evaluator id: {4}",
+                     failedTask.Id,
+                     exception.GetType(),
+                     innerExceptionType,
+                     innerExceptionMsg,
+                     failedTask.GetActiveContext().Value.EvaluatorId);
+            }
+            else
+            {
+                Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}",
+                     failedTask.Id,
+                     exception.GetType(),
+                     innerExceptionType,
+                     innerExceptionMsg);
+            }
+
             if (exception is IMRUTaskAppException)
             {
                 _numberOfAppErrors++;
@@ -362,10 +408,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             {
                 return TaskStateEvent.FailedTaskCommunicationError;
             }
-            else
+            if (exception is IMRUTaskSystemException)
             {
                 return TaskStateEvent.FailedTaskSystemError;
             }
+
+            // special case for communication error during group communication initialization
+            if (exception is TaskClientCodeException)
+            {
+                // try extract cause and check whether it is InjectionException for GroupCommClient
+                if (exception.InnerException != null &&
+                    exception.InnerException is InjectionException &&
+                    exception.InnerException.Message.Contains("GroupCommClient"))
+                {
+                    Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType:FailedTaskCommunicationError with task id {0}", failedTask.Id);
+                    return TaskStateEvent.FailedTaskCommunicationError;
+                }
+            }
+
+            Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType for un-hanlded exception with task id {0} and exception type {1}", failedTask.Id, exception.GetType());
+            return TaskStateEvent.FailedTaskSystemError;
         }
 
         /// <summary>
@@ -381,9 +443,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// Checks if all the tasks are in final states
         /// </summary>
         /// <returns></returns>
-        internal bool AllInFinalState()
+        internal bool AreAllTasksInFinalState()
         {
-            return _tasks.All(t => t.Value.TaskState.IsFinalState());
+            var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList();
+            var count = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Count();
+
+            if (notInFinalState.Any())
+            {
+                Logger.Log(Level.Info, "Total tasks that are not in final state: {0}, and first 5 are:\r\n {1}", count, string.Join("\r\n", notInFinalState.Select(ToLog)));
+            }
+            else
+            {
+                Logger.Log(Level.Info, "All the tasks are in final state");
+            }
+
+            return !notInFinalState.Any();
+        }
+
+        private string ToLog(KeyValuePair<string, TaskInfo> t)
+        {
+            try
+            {
+                return string.Format("State={0}, taskId={1}, ContextId={2}, evaluatorId={3}, evaluatorHost={4}",
+                    t.Value.TaskState.CurrentState,
+                    t.Key,
+                    t.Value.ActiveContext.Id,
+                    t.Value.ActiveContext.EvaluatorId,
+                    t.Value.ActiveContext.EvaluatorDescriptor.NodeDescriptor.HostName);
+            }
+            catch (Exception ex)
+            {
+                return string.Format("Failed to get task string: {0}", ex);
+            }
         }
 
         /// <summary>
@@ -415,18 +506,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         internal void SubmitTasks()
         {
-            if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+            using (Logger.LogFunction("TaskManager::SubmitTasks"))
             {
-                string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks);
-                Exceptions.Throw(new IMRUSystemException(msg), Logger);
-            }
+                if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+                {
+                    string msg =
+                        string.Format(
+                            "Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].",
+                            NumberOfTasks,
+                            _totalExpectedTasks);
+                    Exceptions.Throw(new IMRUSystemException(msg), Logger);
+                }
 
-            foreach (var taskId in _tasks.Keys)
-            {
-                var taskInfo = GetTaskInfo(taskId);
-                taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
-                UpdateState(taskId, TaskStateEvent.SubmittedTask);
-            }
+                SubmitTask(_masterTaskId);
+
+                foreach (var taskId in _tasks.Keys)
+                {
+                    if (taskId.Equals(_masterTaskId))
+                    {
+                        continue;
+                    }
+                    SubmitTask(taskId);
+                }
+        }
+    }
+
+        private void SubmitTask(string taskId)
+        {
+            Logger.Log(Level.Info, "SubmitTask with task id: {0}.", taskId);
+            var taskInfo = GetTaskInfo(taskId);
+            taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
+            UpdateState(taskId, TaskStateEvent.SubmittedTask);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index bce1e4d..ca2fb85 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -17,7 +17,8 @@
 
 using System;
 using System.IO;
-using System.Text;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
@@ -75,13 +76,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="groupCommunicationsClient">Used to setup the communications.</param>
         /// <param name="taskCloseCoordinator">Task close Coordinator</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param>
+        /// <param name="taskId">task id</param>
         [Inject]
         private MapTaskHost(
             IMapFunction<TMapInput, TMapOutput> mapTask,
             IGroupCommClient groupCommunicationsClient,
             TaskCloseCoordinator taskCloseCoordinator,
-            [Parameter(typeof(InvokeGC))] bool invokeGC)
+            [Parameter(typeof(InvokeGC))] bool invokeGC,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId)
         {
+            Logger.Log(Level.Info, "Entering constructor of MapTaskHost for task id {0}", taskId);
             _mapTask = mapTask;
             _groupCommunicationsClient = groupCommunicationsClient;
             var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
@@ -91,6 +95,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _invokeGC = invokeGC;
             _taskCloseCoordinator = taskCloseCoordinator;
             _cancellationSource = new CancellationTokenSource();
+            Logger.Log(Level.Info, "MapTaskHost initialized.");
         }
 
         /// <summary>
@@ -100,21 +105,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <returns></returns>
         public byte[] Call(byte[] memento)
         {
+            Logger.Log(Level.Info, "Entering MapTaskHost Call().");
             MapControlMessage controlMessage = MapControlMessage.AnotherRound;
-
-            while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop)
+            try
             {
-                if (_invokeGC)
+                while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop)
                 {
-                    Logger.Log(Level.Verbose, "Calling Garbage Collector");
-                    GC.Collect();
-                    GC.WaitForPendingFinalizers();
-                }
+                    if (_invokeGC)
+                    {
+                        Logger.Log(Level.Verbose, "Calling Garbage Collector");
+                        GC.Collect();
+                        GC.WaitForPendingFinalizers();
+                    }
 
-                try
-                {
                     using (
-                    MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive(_cancellationSource))
+                        MapInputWithControlMessage<TMapInput> mapInput =
+                            _dataAndMessageReceiver.Receive(_cancellationSource))
                     {
                         controlMessage = mapInput.ControlMessage;
                         if (controlMessage != MapControlMessage.Stop)
@@ -123,32 +129,77 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                         }
                     }
                 }
-                catch (OperationCanceledException e)
-                {
-                    Logger.Log(Level.Warning, "Received OperationCanceledException in MapTaskHost with message: {0}.", e.Message);
-                    break;
-                }
-                catch (IOException e)
+            }
+            catch (OperationCanceledException e)
+            {
+                Logger.Log(Level.Warning,
+                    "Received OperationCanceledException in MapTaskHost with message: {0}. The cancellation token is: {1}.",
+                    e.Message,
+                    _cancellationSource.IsCancellationRequested);
+            }
+            catch (Exception e)
+            {
+                if (e is IOException || e is TcpClientConnectionException || e is RemotingException ||
+                    e is SocketException)
                 {
-                    Logger.Log(Level.Error, "Received IOException in MapTaskHost with message: {0}.", e.Message);
+                    Logger.Log(Level.Error,
+                        "Received Exception {0} in MapTaskHost with message: {1}. The cancellation token is: {2}.",
+                        e.GetType(),
+                        e.Message,
+                        _cancellationSource.IsCancellationRequested);
                     if (!_cancellationSource.IsCancellationRequested)
                     {
+                        Logger.Log(Level.Error,
+                            "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                            _cancellationSource.IsCancellationRequested);
                         throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
                     }
-                    break;
                 }
-                catch (TcpClientConnectionException e)
+                else if (e is AggregateException)
+                {
+                    Logger.Log(Level.Error,
+                        "Received AggregateException. The cancellation token is: {0}.",
+                        _cancellationSource.IsCancellationRequested);
+                    if (e.InnerException != null)
+                    {
+                        Logger.Log(Level.Error,
+                            "InnerException {0}, with message {1}.",
+                            e.InnerException.GetType(),
+                            e.InnerException.Message);
+                    }
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        if (e.InnerException != null && e.InnerException is IOException)
+                        {
+                            Logger.Log(Level.Error,
+                                "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                                _cancellationSource.IsCancellationRequested);
+                            throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                        }
+                        else
+                        {
+                            throw e;
+                        }
+                    }                   
+                }
+                else
                 {
-                    Logger.Log(Level.Error, "Received TcpClientConnectionException in MapTaskHost with message: {0}.", e.Message);
+                    Logger.Log(Level.Error,
+                       "MapTask is throwing Exception {0}, message {1} with cancellation token: {2} and StackTrace {3}.",
+                       e.GetType(),
+                       e.Message,
+                       _cancellationSource.IsCancellationRequested,
+                       e.StackTrace);
                     if (!_cancellationSource.IsCancellationRequested)
                     {
-                        throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                        throw e;
                     }
-                    break;
                 }
             }
-
-            _taskCloseCoordinator.SignalTaskStopped();
+            finally
+            {
+                _taskCloseCoordinator.SignalTaskStopped();
+            } 
             Logger.Log(Level.Info, "MapTaskHost returned with cancellation token:{0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
index a9014c3..af20809 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
@@ -15,12 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
 using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks.Events;
-using Org.Apache.REEF.IMRU.OnREEF.Driver;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
@@ -56,6 +53,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="cancellationTokenSource"></param>
         internal void HandleEvent(ICloseEvent closeEvent, CancellationTokenSource cancellationTokenSource)
         {
+            Logger.Log(Level.Info, "HandleEvent: The task received close event");
             cancellationTokenSource.Cancel();
             _waitToCloseEvent.Wait();
 


[3/3] reef git commit: [REEF-1251] IMRU Driver handlers for fault tolerant

Posted by ju...@apache.org.
[REEF-1251] IMRU Driver handlers for fault tolerant

* IMRU Driver handler re-write to support fault tolerant
* ServiceAndContextConfigurationProvider refactor and clean up for the updated IMRU driver
* IMRUClient update to use the updated IMRUDriver
* Allow client to set MaxRetryNumberInRecovery
* Fixes for bugs found during testing
* Add tests for evaluator and task failures at mapper and update evaluators

JIRA:
  [REEF-1251](https://issues.apache.org/jira/browse/REEF-1251)
  [REEF-1551](https://issues.apache.org/jira/browse/REEF-1551)
  [REEF-1552](https://issues.apache.org/jira/browse/REEF-1552)
  [REEF-1553](https://issues.apache.org/jira/browse/REEF-1553)

This closes #1087


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b14c8cd8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b14c8cd8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b14c8cd8

Branch: refs/heads/master
Commit: b14c8cd8191b70249a939c1cca25a61e7231a9b0
Parents: d116d94
Author: Julia Wang <ju...@apache.org>
Authored: Mon Jun 6 20:42:29 2016 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Thu Sep 1 17:18:03 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/EvaluatorRuntime.cs       |  11 +-
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |   3 +
 .../Org.Apache.REEF.Driver.csproj               |   4 +-
 .../OnREEFIMRURunTimeConfiguration.cs           |  19 +-
 .../Org.Apache.REEF.IMRU.Examples.csproj        |   1 +
 .../FaultTolerantPipelinedBroadcastAndReduce.cs | 170 ++++
 .../PipelinedBroadcastAndReduce.cs              |  27 +-
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    |  41 +-
 .../TestActiveContextManager.cs                 |  15 +-
 .../TestEvaluatorManager.cs                     |  76 +-
 .../TestTaskManager.cs                          |  54 +-
 .../API/IMRUJobDefinition.cs                    |  12 +
 .../API/IMRUJobDefinitionBuilder.cs             |  14 +
 .../OnREEF/Client/REEFIMRUClient.cs             |   4 +
 .../OnREEF/Driver/ActiveContextManager.cs       |  10 +-
 .../OnREEF/Driver/EvaluatorManager.cs           |  20 +-
 .../OnREEF/Driver/IMRUDriver.cs                 | 821 +++++++++++++++----
 .../ServiceAndContextConfigurationProvider.cs   | 191 +----
 .../OnREEF/Driver/TaskManager.cs                | 144 +++-
 .../OnREEF/IMRUTasks/MapTaskHost.cs             | 103 ++-
 .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs    |   4 +-
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  88 +-
 .../AllowedFailedEvaluatorsFraction.cs          |   2 +-
 .../Parameters/MaxRetryNumberInRecovery.cs      |  29 +
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 .../Group/Driver/IGroupCommDriver.cs            |   2 +-
 .../Group/Driver/Impl/GroupCommDriver.cs        |   2 +-
 .../Exceptions/IllegalStateException.cs         |   7 +
 .../Exceptions/InjectionException.cs            |   9 +-
 .../Functional/IMRU/IMRUBroadcastReduceTest.cs  |  27 +-
 ...oadcastReduceWithFilePartitionDataSetTest.cs |   6 +-
 .../IMRU/IMRUBroadcastReduceWithLocalFile.cs    |   1 +
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |  65 +-
 .../IMRUBrodcastReduceWithoutIMRUClientTest.cs  |   8 +-
 .../Functional/IMRU/IMRUCloseTaskTest.cs        |  26 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs | 173 ++++
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      |  86 ++
 .../Functional/IMRU/TestFailMapperTasks.cs      |  90 ++
 .../Functional/IMRU/TestFailUpdateEvaluator.cs  | 236 ++++++
 .../Org.Apache.REEF.Tests.csproj                |   4 +
 .../Remote/Impl/StreamingLink.cs                |   6 +-
 .../Remote/Impl/StreamingTransportClient.cs     |  33 +-
 .../Remote/Impl/StreamingTransportServer.cs     |  41 +-
 .../Remote/Parameters/ConnectionRetryCount.cs   |   2 +-
 44 files changed, 2176 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index c1448d1..077ba31 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -91,11 +91,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             }
         }
 
+        private string MessageFieldAsText(object field)
+        {
+            return field == null ? "null" : "not null";
+        }
+
         public void Handle(EvaluatorControlProto message)
         {
             lock (_heartBeatManager)
             {
-                Logger.Log(Level.Info, "Handle Evaluator control message");
+                var msg = " done_evaluator = " + MessageFieldAsText(message.done_evaluator)
+                          + " kill_evaluator = " + MessageFieldAsText(message.kill_evaluator)
+                          + " stop_evaluator = " + MessageFieldAsText(message.stop_evaluator)
+                          + " context_control = " + MessageFieldAsText(message.context_control);
+                Logger.Log(Level.Info, "Handle Evaluator control message: " + msg);
                 if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
                 {
                     OnException(new InvalidOperationException(

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 330c7b4..053ef23 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -113,14 +113,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 }
                 catch (TaskStartHandlerException e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException");
                     _currentStatus.SetException(e.InnerException);
                 }
                 catch (TaskStopHandlerException e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException");
                     _currentStatus.SetException(e.InnerException);
                 }
                 catch (Exception e)
                 {
+                    Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType());
                     _currentStatus.SetException(e);
                 }
                 finally

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index c7e55b2..9bebad6 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -30,7 +30,9 @@ under the License.
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
-    <Reference Include="Microsoft.Hadoop.Avro, Version=1.5.6.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL" />
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
     <Reference Include="protobuf-net">
       <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
     </Reference>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
index f47b473..31585da 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
@@ -19,7 +19,7 @@ using System.Globalization;
 using Org.Apache.REEF.Client.Local;
 using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.IMRU.OnREEF.Client;
-using Org.Apache.REEF.IO.FileSystem.Hadoop;
+using Org.Apache.REEF.Network;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Interface;
 
@@ -59,7 +59,7 @@ namespace Org.Apache.REEF.IMRU.Examples
                    .Build();
             }
 
-            return Configurations.Merge(runtimeConfig, imruClientConfig);
+            return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration());
         }
 
         /// <summary>
@@ -71,9 +71,18 @@ namespace Org.Apache.REEF.IMRU.Examples
             IConfiguration imruClientConfig =
                 REEFIMRUClientConfiguration.ConfigurationModule.Build();
 
-            IConfiguration runtimeConfig =
-                YARNClientConfiguration.ConfigurationModule.Build();
-            return Configurations.Merge(runtimeConfig, imruClientConfig);
+            var runtimeConfig = YARNClientConfiguration.ConfigurationModule
+                .Build();
+
+            return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration());
+        }
+
+        private static IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 1a17903..3cb5fd3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,6 +46,7 @@ under the License.
     <Compile Include="MapperCount\IdentityMapFunction.cs" />
     <Compile Include="IntSumReduceFunction.cs" />
     <Compile Include="MapperCount\MapperCount.cs" />
+    <Compile Include="PipelinedBroadcastReduce\FaultTolerantPipelinedBroadcastAndReduce.cs" />
     <Compile Include="SingleIterUpdateFunction.cs" />
     <Compile Include="NaturalSum\NaturalSum.cs" />
     <Compile Include="NaturalSum\NaturalSumMapFunction.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
new file mode 100644
index 0000000..672389c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
@@ -0,0 +1,170 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// IMRU program that performs broadcast and reduce with fault tolerance.
+    /// </summary>
+    public class FaultTolerantPipelinedBroadcastAndReduce : PipelinedBroadcastAndReduce
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce));
+
+        [Inject]
+        protected FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient imruClient) : base(imruClient)
+        {
+        }
+        
+        /// <summary>
+        /// Build a test mapper function configuration
+        /// </summary>
+        /// <param name="maxRetryInRecovery">Number of retries done if first run failed.</param>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery)
+        {
+            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)
+                .Build();
+
+            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), maxRetryInRecovery.ToString())
+                .Build();
+
+            return Configurations.Merge(c1, c2);
+        }
+
+        [NamedParameter(Documentation = "Set of task ids which will produce task/evaluator failure")]
+        public class TaskIdsToFail : Name<ISet<string>>
+        {
+        }
+
+        [NamedParameter(Documentation = "Type of failure to simulate")]
+        public class FailureType : Name<int>
+        {
+            internal static readonly int EvaluatorFailureDuringTaskExecution = 0;
+            internal static readonly int TaskFailureDuringTaskExecution = 1;
+            internal static readonly int EvaluatorFailureDuringTaskInitialization = 2;
+            internal static readonly int TaskFailureDuringTaskInitialization = 3;
+
+            internal static bool IsEvaluatorFailure(int failureType)
+            {
+                return failureType == EvaluatorFailureDuringTaskExecution ||
+                       failureType == EvaluatorFailureDuringTaskInitialization;
+            }
+        }
+
+        /// <summary>
+        /// The function is to simulate Evaluator/Task failure for mapper evaluator
+        /// </summary>
+        public sealed class TestSenderMapFunction : IMapFunction<int[], int[]>
+        {
+            private int _iterations;
+            private readonly string _taskId;
+            private readonly ISet<string> _taskIdsToFail;
+            private int _failureType;
+            private readonly int _maxRetryInRecovery;
+
+            [Inject]
+            private TestSenderMapFunction(
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+                [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
+                [Parameter(typeof(FailureType))] int failureType,
+                [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery)
+            {
+                _taskId = taskId;
+                _taskIdsToFail = taskIdsToFail;
+                _failureType = failureType;
+                _maxRetryInRecovery = maxRetryNumberInRecovery;
+                Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery {1},  Failure type: {2}.", _taskId, _maxRetryInRecovery, _failureType);
+                foreach (var n in _taskIdsToFail)
+                {
+                    Logger.Log(Level.Info, "TestSenderMapFunction: taskIdsToFail: {0}", n);
+                }
+
+                if (_failureType == FailureType.EvaluatorFailureDuringTaskInitialization || 
+                    _failureType == FailureType.TaskFailureDuringTaskInitialization)
+                {
+                    SimulateFailure(0);
+                }
+            }
+
+            /// <summary>
+            /// Map function
+            /// </summary>
+            /// <param name="mapInput">integer array</param>
+            /// <returns>The same integer array</returns>
+            int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+            {
+                _iterations++;
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}.", mapInput[0], _iterations);
+
+                if (_failureType == FailureType.EvaluatorFailureDuringTaskExecution ||
+                    _failureType == FailureType.TaskFailureDuringTaskExecution)
+                {
+                    SimulateFailure(10);
+                }
+
+                if (mapInput[0] != _iterations)
+                {
+                    Exceptions.Throw(new Exception("Expected value in mappers (" + _iterations + ") different from actual value (" + mapInput[0] + ")"), Logger);
+                }
+
+                return mapInput;
+            }
+
+            private void SimulateFailure(int onIteration)
+            {
+                if (_iterations == onIteration &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) != null &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + _maxRetryInRecovery)) == null)
+                {
+                    Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}",
+                        FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task",
+                        _taskId);
+                    if (FailureType.IsEvaluatorFailure(_failureType))
+                    {
+                        // simulate evaluator failure
+                        Environment.Exit(1);
+                    }
+                    else
+                    {
+                        // simulate task failure
+                        throw new ArgumentNullException("Simulating task failure");
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index de1598c..01c6daa 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Globalization;
 using System.IO;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
@@ -29,12 +31,12 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
     /// <summary>
     /// IMRU program that performs broadcast and reduce
     /// </summary>
-    public sealed class PipelinedBroadcastAndReduce
+    public class PipelinedBroadcastAndReduce
     {
         private readonly IIMRUClient _imruClient;
 
         [Inject]
-        private PipelinedBroadcastAndReduce(IIMRUClient imruClient)
+        protected PipelinedBroadcastAndReduce(IIMRUClient imruClient)
         {
             _imruClient = imruClient;
         }
@@ -42,7 +44,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Runs the actual broadcast and reduce job
         /// </summary>
-        public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory)
+        public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery)
         {
             var updateFunctionConfig =
                 TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule
@@ -76,10 +78,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
 
             var results = _imruClient.Submit<int[], int[], int[], Stream>(
                 new IMRUJobDefinitionBuilder()
-                    .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], int[]>.ConfigurationModule
-                        .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
-                            GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
-                        .Build())
+                    .SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery))
                     .SetUpdateFunctionConfiguration(updateFunctionConfig)
                     .SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
                         .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class)
@@ -99,8 +98,22 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                     .SetJobName("BroadcastReduce")
                     .SetNumberOfMappers(numberofMappers)
                     .SetMapperMemory(mapperMemory)
+                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
                     .SetUpdateTaskMemory(updateTaskMemory)
                     .Build());
         }
+
+        protected virtual IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery)
+        {
+            return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
+                .Build();
+        }
+
+        internal void Run(int v, int chunkSize, int iterations, int dims, int mapperMemory, int updateTaskMemory)
+        {
+            throw new NotImplementedException();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index 18876ab..d002f2d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -19,7 +19,6 @@ using System;
 using System.Globalization;
 using System.Linq;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IO.FileSystem.Hadoop;
 using Org.Apache.REEF.IO.FileSystem.Local;
@@ -30,7 +29,7 @@ using Org.Apache.REEF.Utilities.Logging;
 namespace Org.Apache.REEF.IMRU.Examples
 {
     /// <summary>
-    /// Runs IMRU for mapper count either in localruntime or on cluster.
+    /// Runs IMRU for mapper count either in local runtime or on cluster.
     /// </summary>
     public class Run
     {
@@ -59,13 +58,14 @@ namespace Org.Apache.REEF.IMRU.Examples
             mapperCountExample.Run(numNodes - 1, filename, fileSystemConfig);
         }
 
-        public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string[] args, params string[] runtimeDir)
+        public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, bool faultTolerant, string[] args, params string[] runtimeDir)
         {
             int chunkSize = 2;
             int dims = 10;
-            int iterations = 10;
+            int iterations = 100;
             int mapperMemory = 512;
             int updateTaskMemory = 512;
+            int maxRetryNumberInRecovery = 2;         
 
             if (args.Length > 0)
             {
@@ -92,6 +92,11 @@ namespace Org.Apache.REEF.IMRU.Examples
                 iterations = Convert.ToInt32(args[4]);
             }
 
+            if (args.Length > 5)
+            {
+                maxRetryNumberInRecovery = Convert.ToInt32(args[5]);
+            }
+
             IInjector injector;
 
             if (!runOnYarn)
@@ -105,10 +110,26 @@ namespace Org.Apache.REEF.IMRU.Examples
                 injector = TangFactory.GetTang()
                     .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], int[]>.GetYarnIMRUConfiguration(), tcpPortConfig);
             }
-            var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>();
-            broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory);
+
+            if (faultTolerant)
+            {
+                var broadcastReduceFtExample = injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>();
+                broadcastReduceFtExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+            }
+            else
+            {
+                var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>();
+                broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+            }
         }
 
+        /// <summary>
+        /// Run IMRU examples from command line
+        /// </summary>
+        /// Sample command line:  
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduce 20000000 1000000 1024 1024 10 2
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduceft 20000000 1000000 1024 1024 100 2
+        /// <param name="args"></param>
         private static void Main(string[] args)
         {
             Logger.Log(Level.Info, "start running client: " + DateTime.Now);
@@ -168,10 +189,16 @@ namespace Org.Apache.REEF.IMRU.Examples
 
                 case "broadcastandreduce":
                     Logger.Log(Level.Info, "Running Broadcast and Reduce");
-                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, args.Skip(5).ToArray());
+                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, false, args.Skip(5).ToArray());
                     Logger.Log(Level.Info, "Done Running Broadcast and Reduce");
                     return;
 
+                case "broadcastandreduceft":
+                    Logger.Log(Level.Info, "Running Broadcast and Reduce FT");
+                    RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, true, args.Skip(5).ToArray());
+                    Logger.Log(Level.Info, "Done Running Broadcast and Reduce FT");
+                    return;
+
                 default:
                     Logger.Log(Level.Info, "wrong test name");
                     return;

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
index d3d09d6..f2a0dbf 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using NSubstitute;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
@@ -181,10 +182,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         /// <summary>
         /// A Context Manager observer for test
         /// </summary>
-        private sealed class TestContextObserver : IObserver<IDictionary<string, IActiveContext>>
+        private sealed class TestContextObserver : IObserver<IEnumerable<IActiveContext>>
         {
             private readonly int _totalExpected;
-            private IDictionary<string, IActiveContext> _contexts = null;
+            private int _contextCount = 0;
 
             internal TestContextObserver(int totalExpected)
             {
@@ -203,16 +204,12 @@ namespace Org.Apache.REEF.IMRU.Tests
 
             public int NumberOfActiveContextsReceived()
             {
-                if (_contexts != null)
-                {
-                    return _contexts.Count;                    
-                }
-                return 0;
+                return _contextCount;
             }
 
-            public void OnNext(IDictionary<string, IActiveContext> value)
+            public void OnNext(IEnumerable<IActiveContext> value)
             {
-                _contexts = value;
+                _contextCount = value.Count();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
index 489c725..a5c4caa 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
@@ -38,22 +38,22 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestValidAddRemoveAllocatedEvaluator()
         {
-            var evalutorManager = CreateTestEvaluators(3, 1);
-            Assert.Equal(3, evalutorManager.NumberOfAllocatedEvaluators);
-            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
-            Assert.True(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
-            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2));
-            Assert.True(evalutorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2));
-            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            Assert.Equal(3, evaluatorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evaluatorManager.AreAllEvaluatorsAllocated());
+            Assert.True(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
+            Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2));
+            Assert.True(evaluatorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2));
+            Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
 
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            Assert.Equal(2, evalutorManager.NumberOfAllocatedEvaluators);
-            Assert.True(evalutorManager.IsMasterEvaluatorFailed());
-            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            Assert.Equal(2, evaluatorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evaluatorManager.IsMasterEvaluatorFailed());
+            Assert.Equal(0, evaluatorManager.NumberofFailedMappers());
 
-            evalutorManager.ResetFailedEvaluators();
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
+            evaluatorManager.ResetFailedEvaluators();
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            Assert.True(evaluatorManager.AreAllEvaluatorsAllocated());
         }
 
         /// <summary>
@@ -62,10 +62,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestNoMasterEvaluator()
         {
-            var evalutorManager = CreateEvaluatorManager(3, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(3, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -75,10 +75,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestTwoMasterEvaluator()
         {
-            var evalutorManager = CreateEvaluatorManager(3, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(3, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -88,10 +88,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestTooManyEvaluators()
         {
-            var evalutorManager = CreateEvaluatorManager(2, 1);
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            var evaluatorManager = CreateEvaluatorManager(2, 1);
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -113,14 +113,14 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestResetFailedEvaluators()
         {
-            var evalutorManager = CreateTestEvaluators(3, 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
-            Assert.Equal(2, evalutorManager.NumberOfMissingEvaluators());
-            evalutorManager.ResetFailedEvaluators();
-            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
-            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
-            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Assert.Equal(2, evaluatorManager.NumberOfMissingEvaluators());
+            evaluatorManager.ResetFailedEvaluators();
+            Assert.Equal(0, evaluatorManager.NumberofFailedMappers());
+            Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1));
+            Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
         }
 
         /// <summary>
@@ -129,10 +129,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestReachedMaximumNumberOfEvaluatorFailures()
         {
-            var evalutorManager = CreateTestEvaluators(3, 2);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
-            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
-            Assert.True(evalutorManager.ReachedMaximumNumberOfEvaluatorFailures());
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Assert.True(evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures());
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index bdcebc2..d35f7c8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -205,7 +205,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError));
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError));
             taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError));
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -254,7 +254,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver));
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -281,7 +281,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -312,7 +312,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -341,7 +341,19 @@ namespace Org.Apache.REEF.IMRU.Tests
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask);
             Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId));
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
+        }
+
+        /// <summary>
+        /// Test the scenario where there is no task associated with the Failed Evaluator. 
+        /// This can happen when submitting a task on a failed evaluator. 
+        /// </summary>
+        [Fact]
+        public void TestFailedEvaluatorWithUnsuccessfullySubmittedTask()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+            taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluatorWithoutTaskId(EvaluatorIdPrefix + ContextIdPrefix + 1));
+            Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1));
         }
 
         /// <summary>
@@ -378,7 +390,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -411,7 +423,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -439,7 +451,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -476,7 +488,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -515,7 +527,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -548,7 +560,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError);
             taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask);
 
-            Assert.True(taskManager.AllInFinalState());
+            Assert.True(taskManager.AreAllTasksInFinalState());
         }
 
         /// <summary>
@@ -633,15 +645,19 @@ namespace Org.Apache.REEF.IMRU.Tests
                 case TaskManager.TaskGroupCommunicationError:
                     taskException = new IMRUTaskGroupCommunicationException(errorMsg);
                     break;
-                default:
+                case TaskManager.TaskSystemError:
                     taskException = new IMRUTaskSystemException(errorMsg);
                     break;
+                default:
+                    taskException = new IMRUTaskAppException(errorMsg);
+                    break;
             }
 
             IFailedTask failedtask = Substitute.For<IFailedTask>();
             failedtask.Id.Returns(taskId);
             failedtask.Message.Returns(errorMsg);
             failedtask.AsError().Returns(taskException);
+            failedtask.GetActiveContext().Returns(Optional<IActiveContext>.Empty());
             return failedtask;
         }
 
@@ -684,6 +700,20 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Creates a mock IFailedEvaluator with no task id associated
+        /// This is to simulate the case where task is submitted on a failed evaluator. 
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        private static IFailedEvaluator CreateMockFailedEvaluatorWithoutTaskId(string evaluatorId)
+        {
+            var failedEvalutor = Substitute.For<IFailedEvaluator>();
+            failedEvalutor.Id.Returns(evaluatorId);
+            failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Empty());
+            return failedEvalutor;
+        }
+
+        /// <summary>
         /// Creates a mock IConfiguration
         /// </summary>
         /// <returns></returns>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 5ea8d23..d42bf9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -41,6 +41,7 @@ namespace Org.Apache.REEF.IMRU.API
         private readonly int _updateTaskMemory;
         private readonly int _mapTaskCores;
         private readonly int _updateTaskCores;
+        private readonly int _maxRetryNumberInRecovery;
         private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig;
         private readonly bool _invokeGC;
 
@@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API
         /// <param name="updateTaskMemory">Update task memory</param>
         /// <param name="mapTaskCores">Number of map task cores</param>
         /// <param name="updateTaskCores">Number of update task cores</param>
+        /// <param name="maxRetryNumberInRecovery">Max number of retries done if first run of IMRU job failed</param>
         /// <param name="jobName">Job name</param>
         /// <param name="invokeGC">Whether to call garbage collector after each iteration</param>
         internal IMRUJobDefinition(
@@ -84,6 +86,7 @@ namespace Org.Apache.REEF.IMRU.API
             int updateTaskMemory,
             int mapTaskCores,
             int updateTaskCores,
+            int maxRetryNumberInRecovery,
             string jobName,
             bool invokeGC)
         {
@@ -101,6 +104,7 @@ namespace Org.Apache.REEF.IMRU.API
             _updateTaskMemory = updateTaskMemory;
             _mapTaskCores = mapTaskCores;
             _updateTaskCores = updateTaskCores;
+            _maxRetryNumberInRecovery = maxRetryNumberInRecovery;
             _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig;
             _invokeGC = invokeGC;
             _resultHandlerConfiguration = resultHandlerConfiguration;
@@ -223,6 +227,14 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Max number of retries done if first run of IMRU job failed.
+        /// </summary>
+        internal int MaxRetryNumberInRecovery
+        {
+            get { return _maxRetryNumberInRecovery; }
+        }
+
+        /// <summary>
         /// Per mapper configuration
         /// </summary>
         internal ISet<IConfiguration> PerMapConfigGeneratorConfig

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index f078c4a..5d56fde 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -38,6 +38,7 @@ namespace Org.Apache.REEF.IMRU.API
         private int _updateTaskMemory;
         private int _coresPerMapper;
         private int _updateTaskCores;
+        private int _maxRetryNumberInRecovery;
         private IConfiguration _mapFunctionConfiguration;
         private IConfiguration _mapInputCodecConfiguration;
         private IConfiguration _updateFunctionCodecsConfiguration;
@@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API
             _updateTaskMemory = 512;
             _coresPerMapper = 1;
             _updateTaskCores = 1;
+            _maxRetryNumberInRecovery = 0;
             _invokeGC = true;
             _perMapConfigGeneratorConfig = new HashSet<IConfiguration>();
         }
@@ -233,6 +235,17 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Set max number of retries done if first run of IMRU job failed.
+        /// </summary>
+        /// <param name="maxRetryNumberInRecovery">Max number of retries</param>
+        /// <returns></returns>
+        public IMRUJobDefinitionBuilder SetMaxRetryNumberInRecovery(int maxRetryNumberInRecovery)
+        {
+            _maxRetryNumberInRecovery = maxRetryNumberInRecovery;
+            return this;
+        }
+
+        /// <summary>
         /// Sets Per Map Configuration
         /// </summary>
         /// <param name="perMapperConfig">Mapper configs</param>
@@ -320,6 +333,7 @@ namespace Org.Apache.REEF.IMRU.API
                 _updateTaskMemory,
                 _coresPerMapper,
                 _updateTaskCores,
+                _maxRetryNumberInRecovery,
                 _jobName,
                 _invokeGC);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 969a874..9a256e0 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -104,6 +104,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                         GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnTaskFailed,
                         GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                    .Set(DriverConfiguration.OnTaskRunning,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                     .Set(DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
                     .Build(),
                 TangFactory.GetTang().NewConfigurationBuilder()
@@ -144,6 +146,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                     jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(CoresForUpdateTask),
                     jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
+                    jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(InvokeGC),
                     jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
                 .Build();

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
index 219a9f6..437b76f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
@@ -32,12 +32,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// Manages active contexts for the driver
     /// </summary>
     [NotThreadSafe]
-    internal sealed class ActiveContextManager : IDisposable
+    internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager));
         private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>();
         private readonly int _totalExpectedContexts;
-        private IObserver<IDictionary<string, IActiveContext>> _activeContextObserver;
+        private IObserver<IEnumerable<IActiveContext>> _activeContextObserver;
 
         /// <summary>
         /// Constructor of ActiveContextManager
@@ -71,7 +71,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         /// <param name="activeContextObserver"></param>
         /// <returns></returns>
-        public IDisposable Subscribe(IObserver<IDictionary<string, IActiveContext>> activeContextObserver)
+        public IDisposable Subscribe(IObserver<IEnumerable<IActiveContext>> activeContextObserver)
         {
             if (_activeContextObserver != null)
             {
@@ -84,7 +84,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Checks if all the requested contexts are received. 
         /// </summary>
-        private bool AreAllContextsReceived
+        internal bool AreAllContextsReceived
         {
             get { return _totalExpectedContexts == NumberOfActiveContexts; }
         }
@@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             if (AreAllContextsReceived && _activeContextObserver != null)
             {
-                _activeContextObserver.OnNext(_activeContexts);
+                _activeContextObserver.OnNext(_activeContexts.Values);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
index 8fa9876..5f6856c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
@@ -201,7 +201,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Records failed Evaluator
         /// Removes it from allocated Evaluator and adds it to the failed Evaluators collection
-        /// If the evaluatorId is not in _failedEvaluators, throw IMRUSystemException
+        /// If the evaluatorId is already in _failedEvaluators, throw IMRUSystemException
         /// </summary>
         /// <param name="evaluatorId"></param>
         internal void RecordFailedEvaluator(string evaluatorId)
@@ -217,11 +217,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Remove failed evaluator from the colletion
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        internal void RemoveFailedEvaluator(string evaluatorId)
+        {
+            if (!_failedEvaluatorIds.Contains(evaluatorId))
+            {
+                string msg = string.Format("The failed evaluator {0} is not recorded in list of failed evaluators.", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _failedEvaluatorIds.Remove(evaluatorId);
+        }
+
+        /// <summary>
         /// Checks if the number of failed Evaluators has reached allowed maximum number of evaluator failures 
         /// </summary>
-        internal bool ReachedMaximumNumberOfEvaluatorFailures()
+        internal bool ExceededMaximumNumberOfEvaluatorFailures()
         {
-            return _failedEvaluatorIds.Count >= AllowedNumberOfEvaluatorFailures;
+            return _failedEvaluatorIds.Count > AllowedNumberOfEvaluatorFailures;
         }
 
         /// <summary>