You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by dh...@apache.org on 2016/07/12 22:53:19 UTC

reef git commit: [REEF-1466] Cancel the blocking message reading and close the task properly

Repository: reef
Updated Branches:
  refs/heads/master 2de3998cb -> 026b5ea60


[REEF-1466] Cancel the blocking message reading and close the task properly

Add cancellation token to GC and Network API as an optional parameter
Update IMRU tasks to pass cancelation token
Update IMRU close handler to cancel the token
Test case update and new test cases

JIRA: [REEF-1466](https://issues.apache.org/jira/browse/REEF-1466)

Pull Request: Closes #1052


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/026b5ea6
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/026b5ea6
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/026b5ea6

Branch: refs/heads/master
Commit: 026b5ea6038cac6f9e05d24654e43d3e97db6b03
Parents: 2de3998
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Jul 5 20:04:24 2016 -0700
Committer: dhruv <dh...@apache.org>
Committed: Tue Jul 12 15:48:45 2016 -0700

----------------------------------------------------------------------
 .../OnREEF/IMRUTasks/MapTaskHost.cs             |  49 +++++-
 .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs    |  57 ++-----
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  88 +++++++----
 .../EnforceCloseTimeoutMilliseconds.cs          |  31 ----
 .../Org.Apache.REEF.IMRU.csproj                 |   1 -
 .../GroupCommunicationTests.cs                  |  98 ++++++++++++
 .../GroupCommunicationTreeTopologyTests.cs      |  90 +++++++++++
 .../Group/Operators/IBroadcastReceiver.cs       |   5 +-
 .../Group/Operators/IReduceReceiver.cs          |   8 +-
 .../Group/Operators/IReduceSender.cs            |   6 +-
 .../Group/Operators/IScatterReceiver.cs         |   4 +-
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   6 +-
 .../Group/Operators/Impl/ReduceReceiver.cs      |   6 +-
 .../Group/Operators/Impl/ReduceSender.cs        |   8 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |   7 +-
 .../Group/Task/IOperatorTopology.cs             |  12 +-
 .../Group/Task/Impl/ChildNodeContainer.cs       |  10 +-
 .../Group/Task/Impl/NodeStruct.cs               |  20 ++-
 .../Group/Task/Impl/OperatorTopology.cs         |  24 +--
 .../Functional/Bridge/TestCloseTask.cs          | 153 +++++--------------
 .../Functional/IMRU/IMRUCloseTaskTest.cs        | 134 +++++++++++-----
 .../Functional/ReefFunctionalTest.cs            |   8 +-
 22 files changed, 531 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index 86b1f7c..bce1e4d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.IO;
 using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
@@ -29,6 +30,7 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -63,6 +65,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
+        /// The cancellation token to control the group communication operation cancellation
+        /// </summary>
+        private readonly CancellationTokenSource _cancellationSource;
+
+        /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the communications.</param>
@@ -83,6 +90,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _dataReducer = cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
             _taskCloseCoordinator = taskCloseCoordinator;
+            _cancellationSource = new CancellationTokenSource();
         }
 
         /// <summary>
@@ -94,7 +102,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         {
             MapControlMessage controlMessage = MapControlMessage.AnotherRound;
 
-            while (!_taskCloseCoordinator.ShouldCloseTask() && controlMessage != MapControlMessage.Stop)
+            while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop)
             {
                 if (_invokeGC)
                 {
@@ -103,18 +111,45 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     GC.WaitForPendingFinalizers();
                 }
 
-                using (
-                    MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive())
+                try
+                {
+                    using (
+                    MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive(_cancellationSource))
+                    {
+                        controlMessage = mapInput.ControlMessage;
+                        if (controlMessage != MapControlMessage.Stop)
+                        {
+                            _dataReducer.Send(_mapTask.Map(mapInput.Message), _cancellationSource);
+                        }
+                    }
+                }
+                catch (OperationCanceledException e)
+                {
+                    Logger.Log(Level.Warning, "Received OperationCanceledException in MapTaskHost with message: {0}.", e.Message);
+                    break;
+                }
+                catch (IOException e)
+                {
+                    Logger.Log(Level.Error, "Received IOException in MapTaskHost with message: {0}.", e.Message);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    }
+                    break;
+                }
+                catch (TcpClientConnectionException e)
                 {
-                    controlMessage = mapInput.ControlMessage;
-                    if (controlMessage != MapControlMessage.Stop)
+                    Logger.Log(Level.Error, "Received TcpClientConnectionException in MapTaskHost with message: {0}.", e.Message);
+                    if (!_cancellationSource.IsCancellationRequested)
                     {
-                        _dataReducer.Send(_mapTask.Map(mapInput.Message));
+                        throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
                     }
+                    break;
                 }
             }
 
             _taskCloseCoordinator.SignalTaskStopped();
+            Logger.Log(Level.Info, "MapTaskHost returned with cancellation token:{0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }
 
@@ -124,7 +159,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            _taskCloseCoordinator.HandleEvent(closeEvent);
+            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
index f60271a..a9014c3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
@@ -29,8 +29,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
     /// <summary>
     /// This class provides a method to handle Task close event. It is called from TaskCloseEventHandler. 
-    /// It also wraps flags to represent if the task should be closed and if the task has been stopped
-    /// so that to provide a coordination between the task and the close handler.  
     /// </summary>
     [ThreadSafe]
     internal sealed class TaskCloseCoordinator
@@ -38,74 +36,41 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private static readonly Logger Logger = Logger.GetLogger(typeof(TaskCloseCoordinator));
 
         /// <summary>
-        /// When a close event is received, this variable is set to 1. At the beginning of each task iteration,
-        /// if this variable is set to 1, the task will break from the loop and return from the Call() method.
-        /// </summary>
-        private long _shouldCloseTask = 0;
-
-        /// <summary>
-        /// Waiting time for the task to close by itself
-        /// </summary>
-        private readonly int _enforceCloseTimeoutMilliseconds;
-
-        /// <summary>
-        /// An event that will wait in close handler until it is either signaled from Call method or timeout.
+        /// An event that will wait in close handler to be signaled from Call method.
         /// </summary>
         private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false);
 
         /// <summary>
-        /// Handle task close event and manage the states, wait/signal when closing the task
+        /// Handle task close event, wait/signal when closing the task
         /// </summary>
-        /// <param name="enforceCloseTimeoutMilliseconds">Timeout in milliseconds to enforce the task to close if receiving task close event</param>
         [Inject]
-        private TaskCloseCoordinator([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds)
+        private TaskCloseCoordinator()
         {
-            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
         }
 
         /// <summary>
         /// Handle Task close event.
-        /// Set _shouldCloseTask to 1 so that to inform the task to stop at the end of the current iteration.
-        /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds,
-        /// If the closed event is sent from driver, checks if the _waitToCloseEvent has been signaled. If not, throw 
-        /// IMRUTaskSystemException to enforce the task to stop.
+        /// Cancel the CancellationToken for data reading operation, then waiting for the signal from Call method. 
         /// </summary>
         /// <param name="closeEvent"></param>
-        internal void HandleEvent(ICloseEvent closeEvent)
+        /// <param name="cancellationTokenSource"></param>
+        internal void HandleEvent(ICloseEvent closeEvent, CancellationTokenSource cancellationTokenSource)
         {
-            Interlocked.Exchange(ref _shouldCloseTask, 1);
-            var taskSignaled = _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+            cancellationTokenSource.Cancel();
+            _waitToCloseEvent.Wait();
 
             if (closeEvent.Value.IsPresent())
             {
-                var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
-                if (msg.Equals(TaskManager.CloseTaskByDriver))
-                {
-                    Logger.Log(Level.Info, "The task received close event with message: {0}.", msg);
-
-                    if (!taskSignaled)
-                    {
-                        throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                    }
-                }
+                Logger.Log(Level.Info, "The task received close event with message: {0}.", Encoding.UTF8.GetString(closeEvent.Value.Value));
             }
             else
             {
-                Logger.Log(Level.Warning, "The task received close event with no message.");
+                Logger.Log(Level.Info, "The task received close event with no message.");
             }
         }
 
         /// <summary>
-        /// Indicates if the task should be stopped.
-        /// </summary>
-        /// <returns></returns>
-        internal bool ShouldCloseTask()
-        {
-            return Interlocked.Read(ref _shouldCloseTask) == 1;
-        }
-
-        /// <summary>
-        /// Called from Task right before the task is returned to signals _waitToCloseEvent. 
+        /// Called from Task right before the task is returned to signals _waitToCloseEvent.
         /// </summary>
         internal void SignalTaskStopped()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 116bc63..af4fbf1 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -16,7 +16,7 @@
 // under the License.
 
 using System;
-using System.Text;
+using System.IO;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
@@ -29,6 +29,7 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -65,6 +66,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
+        /// The cancellation token to control the group communication operation cancellation
+        /// </summary>
+        private readonly CancellationTokenSource _cancellationSource;
+
+        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the communications.</param>
@@ -88,6 +94,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _invokeGC = invokeGC;
             _resultHandler = resultHandler;
             _taskCloseCoordinator = taskCloseCoordinator;
+            _cancellationSource = new CancellationTokenSource();
         }
 
         /// <summary>
@@ -99,44 +106,69 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         {
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
-
-            while (updateResult.HasMapInput && !_taskCloseCoordinator.ShouldCloseTask())
+            try
             {
-                iterNo++;
-
-                using (
-                    var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
-                        MapControlMessage.AnotherRound))
+                while (updateResult.HasMapInput && !_cancellationSource.IsCancellationRequested)
                 {
-                    _dataAndControlMessageSender.Send(message);
+                    iterNo++;
+
+                    using (
+                        var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
+                            MapControlMessage.AnotherRound))
+                    {
+                        _dataAndControlMessageSender.Send(message);
+                    }
+
+                    var input = _dataReceiver.Reduce(_cancellationSource);
+
+                    if (_invokeGC)
+                    {
+                        Logger.Log(Level.Verbose, "Calling Garbage Collector");
+                        GC.Collect();
+                        GC.WaitForPendingFinalizers();
+                    }
+
+                    updateResult = _updateTask.Update(input);
+
+                    if (updateResult.HasResult)
+                    {
+                        _resultHandler.HandleResult(updateResult.Result);
+                    }
                 }
-
-                var input = _dataReceiver.Reduce();
-
-                if (_invokeGC)
+                if (!_cancellationSource.IsCancellationRequested)
                 {
-                    Logger.Log(Level.Verbose, "Calling Garbage Collector");
-                    GC.Collect();
-                    GC.WaitForPendingFinalizers();
+                    MapInputWithControlMessage<TMapInput> stopMessage =
+                        new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
+                    _dataAndControlMessageSender.Send(stopMessage);
                 }
-
-                updateResult = _updateTask.Update(input);
-
-                if (updateResult.HasResult)
+            }
+            catch (OperationCanceledException e)
+            {
+                Logger.Log(Level.Warning,
+                    "Received OperationCanceledException in UpdateTaskHost with message: {0}.",
+                    e.Message);
+            }
+            catch (IOException e)
+            {
+                Logger.Log(Level.Error, "Received IOException in UpdateTaskHost with message: {0}.", e.Message);
+                if (!_cancellationSource.IsCancellationRequested)
                 {
-                    _resultHandler.HandleResult(updateResult.Result);
+                    throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
                 }
             }
-
-            if (!_taskCloseCoordinator.ShouldCloseTask())
+            catch (TcpClientConnectionException e)
             {
-                MapInputWithControlMessage<TMapInput> stopMessage =
-                    new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
-                _dataAndControlMessageSender.Send(stopMessage);
+                Logger.Log(Level.Error,
+                    "Received TcpClientConnectionException in UpdateTaskHost with message: {0}.",
+                    e.Message);
+                if (!_cancellationSource.IsCancellationRequested)
+                {
+                    throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                }
             }
-
             _resultHandler.Dispose();
             _taskCloseCoordinator.SignalTaskStopped();
+            Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }
 
@@ -146,7 +178,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            _taskCloseCoordinator.HandleEvent(closeEvent);
+            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
deleted file mode 100644
index e177895..0000000
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// 
-//   http://www.apache.org/licenses/LICENSE-2.0
-// 
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
-{
-    /// <summary>
-    /// When driver sends close event to a task, it would expect the task to close gracefully. 
-    /// After specified time out, if the task is still not closed, the close handler will throw exception, 
-    /// enforce the task to close after waiting for this much time (in milliseconds). 
-    /// </summary>
-    [NamedParameter("Enforce the task to close after waiting for this much time (in milliseconds).", "EnforceCloseTimeout", "1000")]
-    internal sealed class EnforceCloseTimeoutMilliseconds : Name<int>
-    {
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index cdf87cc..30d110a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -100,7 +100,6 @@ under the License.
     <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs" />
-    <Compile Include="OnREEF\Parameters\EnforceCloseTimeoutMilliseconds.cs" />
     <Compile Include="OnREEF\Parameters\InvokeGC .cs" />
     <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 5eefd44..2f5fe8a 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -380,6 +380,53 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(value, receiver2.Receive());
         }
 
+        /// <summary>
+        /// Test IBroadcastReceiver.Receive() with cancellation token
+        /// </summary>
+        [Fact]
+        public void TestBroadcastOperatorWithCancelation()
+        {
+            string groupName = "group1";
+            string operatorName = "broadcast";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 10;
+            int fanOut = 3;
+
+            IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+            var commGroup = groupCommDriver.DefaultGroup
+                .AddBroadcast(operatorName, masterTaskId)
+                .Build();
+
+            List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
+
+            IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
+            IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
+            IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+
+            var token = new CancellationTokenSource();
+            var taskThread1 = new Thread(() =>
+            {
+                Action receive = () => receiver1.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            var taskThread2 = new Thread(() =>
+            {
+                Action receive = () => receiver2.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            taskThread1.Start();
+            taskThread2.Start();
+            token.Cancel();
+        }
+
         [Fact]
         public void TestBroadcastOperatorWithDefaultCodec()
         {
@@ -613,6 +660,57 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(4, receiver4.Receive().Single());
         }
 
+        /// <summary>
+        /// Test IScatterRecever.Receive() with and without Cancellation token.
+        /// </summary>
+        [Fact]
+        public void TestScatterOperatorWithCancellation()
+        {
+            string groupName = "group1";
+            string operatorName = "scatter";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 5;
+            int fanOut = 2;
+
+            IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+            var commGroup = groupCommDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId)
+                .Build();
+
+            List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
+
+            IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+            IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+            Assert.NotNull(receiver3);
+            Assert.NotNull(receiver4);
+
+            List<int> data = new List<int> { 1, 2, 3, 4 };
+            var token = new CancellationTokenSource();
+            sender.Send(data);
+            Assert.Equal(1, receiver1.Receive(token).Single());
+            Assert.Equal(2, receiver2.Receive(token).Single());
+            Assert.Equal(3, receiver3.Receive(token).Single());
+            Assert.Equal(4, receiver4.Receive(token).Single());
+
+            var taskThread = new Thread(() =>
+            {
+                Action receive = () => receiver1.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            taskThread.Start();
+            token.Cancel();
+        }
+
         [Fact]
         public void TestScatterOperator2()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 45c9bbd..ee037c8 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -635,6 +637,94 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(sum, 6325);
         }
 
+        /// <summary>
+        /// Test IReduceSender.Send() and IReduceReceiver.Receive() with and without cancellation token
+        /// </summary>
+        [Fact]
+        public void TestScatterReduceOperatorsWithCancelation()
+        {
+            string groupName = "group1";
+            string scatterOperatorName = "scatter";
+            string reduceOperatorName = "reduce";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 5;
+            int fanOut = 2;
+
+            var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+            ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
+              .AddScatter<int>(
+                    scatterOperatorName,
+                    masterTaskId,
+                    TopologyTypes.Tree,
+                    GetDefaultDataConverterConfig())
+                .AddReduce<int>(
+                    reduceOperatorName,
+                    masterTaskId,
+                    TopologyTypes.Tree,
+                    GetDefaultDataConverterConfig(),
+                    GetDefaultReduceFuncConfig())
+                .Build();
+
+            var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
+
+            IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
+            IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+            Assert.NotNull(receiver3);
+            Assert.NotNull(receiver4);
+
+            List<int> data = Enumerable.Range(1, 100).ToList();
+
+            sender.Send(data);
+
+            List<int> data1 = receiver1.Receive();
+            List<int> data2 = receiver2.Receive();
+
+            List<int> data3 = receiver3.Receive();
+            List<int> data4 = receiver4.Receive();
+
+            int sum3 = data3.Sum();
+            sumSender3.Send(sum3);
+
+            int sum4 = data4.Sum();
+            sumSender4.Send(sum4);
+
+            int sum2 = data2.Sum();
+            sumSender2.Send(sum2);
+
+            int sum1 = data1.Sum();
+
+            var token = new CancellationTokenSource();
+            token.Cancel();
+            Action send = () => sumSender1.Send(sum1, token);
+            Assert.Throws<OperationCanceledException>(send);
+
+            var taskThread = new Thread(() =>
+            {
+                Action reduce = () => sumReducer.Reduce(token);
+                Assert.Throws<OperationCanceledException>(reduce);
+            });
+
+            taskThread.Start();
+            token.Cancel();
+        }
+
         private IConfiguration GetDefaultCodecConfig()
         {
             return StreamingCodecConfiguration<int>.Conf

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
index 209e6fd..da9a0ec 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -26,7 +28,8 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// <summary>
         /// Receive a message from parent BroadcastSender.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The incoming message</returns>
-        T Receive();
+        T Receive(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
index 1490834..89db753 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -26,13 +28,15 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// <summary>
         /// Returns the class used to reduce incoming messages sent by ReduceSenders.
         /// </summary>
-        IReduceFunction<T> ReduceFunction { get; } 
+        IReduceFunction<T> ReduceFunction { get; }
 
         /// <summary>
         /// Receives messages sent by all ReduceSenders and aggregates them
         /// using the specified IReduceFunction.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The single aggregated data</returns>
-        T Reduce();
+        //// TODO : REEF-1489 to remove null
+        T Reduce(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
index 6c3aca6..f8d22d4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -27,6 +29,8 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// Get reduced data from children, reduce with the data given, then sends reduced data to parent
         /// </summary>
         /// <param name="data">The data to send</param>
-        void Send(T data);
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
+        //// TODO : REEF-1489 to remove null
+        void Send(T data, CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
index 9b313e4..5b622b9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 
 namespace Org.Apache.REEF.Network.Group.Operators
 {
@@ -30,6 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// Receive a sublist of messages sent from the IScatterSender.
         /// </summary>
         /// <returns>The sublist of messages</returns>
-        List<T> Receive();
+        //// TODO : REEF-1489 to remove null
+        List<T> Receive(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index bc72fea..cad774d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -87,15 +88,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Receive a message from parent BroadcastSender.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The incoming message</returns>
-        public T Receive()
+        public T Receive(CancellationTokenSource cancellationSource = null)
         {
             PipelineMessage<T> message;
             var messageList = new List<PipelineMessage<T>>();
 
             do
             {
-                message = _topology.ReceiveFromParent();
+                message = _topology.ReceiveFromParent(cancellationSource);
 
                 if (_topology.HasChildren())
                 {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index e94c1ea..fb129da 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
@@ -99,15 +100,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Receives messages sent by all ReduceSenders and aggregates them
         /// using the specified IReduceFunction.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The single aggregated data</returns>
-        public T Reduce()
+        public T Reduce(CancellationTokenSource cancellationSource = null)
         {
             PipelineMessage<T> message;
             var messageList = new List<PipelineMessage<T>>();
 
             do
             {
-                message = _topology.ReceiveFromChildren(_pipelinedReduceFunc);
+                message = _topology.ReceiveFromChildren(_pipelinedReduceFunc, cancellationSource);
                 messageList.Add(message);
             } 
             while (!message.IsLast);

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 813db3e..76202b2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -16,8 +16,8 @@
 // under the License.
 
 using System;
-using System.Reactive;
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -50,7 +50,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The Task's operator topology graph</param>
-        /// <param name="networkHandler">The handler used to handle incoming messages</param>
         /// <param name="reduceFunction">The function used to reduce the incoming messages</param>
         /// <param name="dataConverter">The converter used to convert original
         /// message to pipelined ones and vice versa.</param>
@@ -105,7 +104,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Sends the data to the operator's ReduceReceiver to be aggregated.
         /// </summary>
         /// <param name="data">The data to send</param>
-        public void Send(T data)
+        /// <param name="cancellationSource">The cancellationSource for cancel the operation</param>
+        public void Send(T data, CancellationTokenSource cancellationSource = null)
         {
             var messageList = PipelineDataConverter.PipelineMessage(data);
 
@@ -118,7 +118,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             {
                 if (_topology.HasChildren())
                 {
-                    var reducedValueOfChildren = _topology.ReceiveFromChildren(_pipelinedReduceFunc);
+                    var reducedValueOfChildren = _topology.ReceiveFromChildren(_pipelinedReduceFunc, cancellationSource);
 
                     var mergeddData = new List<PipelineMessage<T>> { message };
 

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index d6fdfa1..c9d3c49 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -17,6 +17,7 @@
 
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -84,15 +85,15 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Receive a sublist of messages sent from the IScatterSender.
         /// </summary>
         /// <returns>The sublist of messages</returns>
-        public List<T> Receive()
+        public List<T> Receive(CancellationTokenSource cancellationSource = null)
         {
-            IList<T> elements = _topology.ReceiveListFromParent();
+            IList<T> elements = _topology.ReceiveListFromParent(cancellationSource);
             _topology.ScatterToChildren(elements, MessageType.Data);
             return elements.ToList();
         }
 
         /// <summary>
-        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// Ensure all parent and children nodes in the topology are registered with the Name Service.
         /// </summary>
         void IGroupCommOperatorInternal.WaitForRegistration()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
index bc9a1be..97877e8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 
@@ -79,22 +80,27 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// <summary>
         /// Receive an incoming message from the parent Task.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The parent Task's message</returns>
-        T ReceiveFromParent();
+        //// TODO : REEF-1489 to remove null
+        T ReceiveFromParent(CancellationTokenSource cancellationSource = null);
 
         /// <summary>
         /// Receive a list of incoming messages from the parent Task.
         /// </summary>
         /// <returns>The parent Task's list of messages</returns>
-        IList<T> ReceiveListFromParent();
+        //// TODO : REEF-1489 to remove null
+        IList<T> ReceiveListFromParent(CancellationTokenSource cancellationSource = null);
 
         /// <summary>
         /// Receives all messages from child Tasks and reduces them with the
         /// given IReduceFunction.
         /// </summary>
         /// <param name="reduceFunction">The class used to reduce messages</param>
+        /// <param name="cancellationSource">The cancellationSource to cancel the operation</param>
         /// <returns>The result of reducing messages</returns>
-        T ReceiveFromChildren(IReduceFunction<T> reduceFunction);
+        //// TODO : REEF-1489 to remove null
+        T ReceiveFromChildren(IReduceFunction<T> reduceFunction, CancellationTokenSource cancellationSource = null);
 
         /// <summary>
         /// Checks if the node has children

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
index 297212e..37c91a4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
@@ -18,7 +18,9 @@
 using System.Collections;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -28,6 +30,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     [NotThreadSafe]
     internal sealed class ChildNodeContainer<T> : IEnumerable<NodeStruct<T>>
     {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(ChildNodeContainer<T>));
+
         private readonly Dictionary<string, NodeStruct<T>> _childIdToNodeMap = 
             new Dictionary<string, NodeStruct<T>>();
 
@@ -68,9 +72,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Gets the data from all children nodes synchronously.
         /// </summary>
-        public IEnumerable<T> GetDataFromAllChildren()
+        /// <param name="cancellationSource">The cancellation token for GetData operation</param>
+        public IEnumerable<T> GetDataFromAllChildren(CancellationTokenSource cancellationSource = null)
         {
-            return this.SelectMany(child => child.GetData());
+            var r = this.SelectMany(child => child.GetData(cancellationSource));
+            return r;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index 2140c61..ddc7e8c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Concurrent;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -27,6 +30,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// <typeparam name="T"> Generic type of message</typeparam>
     internal sealed class NodeStruct<T>
     {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(NodeStruct<T>));
+
         private readonly BlockingCollection<GroupCommunicationMessage<T>> _messageQueue;
 
         /// <summary>
@@ -62,10 +67,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Gets the first message in the message queue.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The first available message.</returns>
-        internal T[] GetData()
+        internal T[] GetData(CancellationTokenSource cancellationSource = null)
         {
-            return _messageQueue.Take().Data;
+            if (cancellationSource == null || !cancellationSource.IsCancellationRequested)
+            {
+                var r = cancellationSource == null
+                    ? _messageQueue.Take().Data
+                    : _messageQueue.Take(cancellationSource.Token).Data;
+                return r;
+            }
+            else
+            {
+                throw new OperationCanceledException("GetData operation is canceled");
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 66faa29..c4d4d0a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -62,8 +62,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="groupName">The name of the operator's Communication Group</param>
         /// <param name="taskId">The operator's Task identifier</param>
         /// <param name="timeout">Timeout value for cancellation token</param>
-        /// <param name="retryCount">Number of times to retry wating for registration</param>
-        /// <param name="sleepTime">Sleep time between retry wating for registration</param>
+        /// <param name="retryCount">Number of times to retry waiting for registration</param>
+        /// <param name="sleepTime">Sleep time between retry waiting for registration</param>
         /// <param name="rootId">The identifier for the root Task in the topology graph</param>
         /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
         /// <param name="networkObserver"></param>
@@ -244,10 +244,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Receive an incoming message from the parent Task.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The parent Task's message</returns>
-        public T ReceiveFromParent()
+        public T ReceiveFromParent(CancellationTokenSource cancellationSource = null)
         {
-            T[] data = _parent.GetData();
+            T[] data = _parent.GetData(cancellationSource);
             if (data == null || data.Length != 1)
             {
                 throw new InvalidOperationException("Cannot receive data from parent node");
@@ -256,9 +257,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             return data[0];
         }
 
-        public IList<T> ReceiveListFromParent()
+        /// <summary>
+        /// Receive a list of incoming messages from the parent Task.
+        /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
+        /// <returns></returns>
+        public IList<T> ReceiveListFromParent(CancellationTokenSource cancellationSource = null)
         {
-            T[] data = _parent.GetData();
+            T[] data = _parent.GetData(cancellationSource);
             if (data == null || data.Length == 0)
             {
                 throw new InvalidOperationException("Cannot receive data from parent node");
@@ -272,15 +278,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// given IReduceFunction.
         /// </summary>
         /// <param name="reduceFunction">The class used to reduce messages</param>
+        /// <param name="cancellationSource">The cancellation token for the data reading operation cancellation</param>
         /// <returns>The result of reducing messages</returns>
-        public T ReceiveFromChildren(IReduceFunction<T> reduceFunction)
+        public T ReceiveFromChildren(IReduceFunction<T> reduceFunction, CancellationTokenSource cancellationSource = null)
         {
             if (reduceFunction == null)
             {
                 throw new ArgumentNullException("reduceFunction");
             }
 
-            return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren());
+            return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren(cancellationSource));
         }
 
         public bool HasChildren()
@@ -297,7 +304,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
                 _selfId, node.Identifier, message);
-
             _sender.Send(gcm);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index d50489c..5f08b56 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Text;
 using System.Threading;
@@ -29,7 +30,6 @@ using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         private const string CompletedValidationMessage = "CompletedValidationmessage";
         private const string FailToCloseTaskMessage = "FailToCloseTaskMessage";
         private const string BreakTaskMessage = "BreakTaskMessage";
+        private const string CancelTaskMessage = "CancelTaskMessage";
         private const string EnforceToCloseMessage = "EnforceToCloseMessage";
 
         /// <summary>
@@ -74,35 +75,18 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
-        /// This test is to close a running task and enforce it to break and return after the current iteration
+        /// This test is to close a running task and with CalcellationToken
         /// </summary>
         [Fact]
-        public void TestBreakTaskOnLocalRuntime()
+        public void TestCancelTaskWithTaskCloseCoordinatorOnLocalRuntime()
         {
             string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForBreakTask()), typeof(CloseTaskTestDriver), 1, "TestBreakTask", "local", testFolder);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForCancellationTask()), typeof(CloseTaskTestDriver), 1, "TestBreakTask", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
             ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1);
             var messages = new List<string>();
             messages.Add(DisposeMessageFromDriver);
-            messages.Add(BreakTaskMessage);
-            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1);
-            CleanUp(testFolder);
-        }
-
-        /// <summary>
-        /// This test is to close a running task and enforce it to break and return after the current iteration
-        /// </summary>
-        [Fact]
-        public void TestEnforceCloseTaskOnLocalRuntime()
-        {
-            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, "TestEnforceCloseTask", "local", testFolder);
-            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
-            ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 0);
-            var messages = new List<string>();
-            messages.Add(DisposeMessageFromDriver);
-            messages.Add(EnforceToCloseMessage);
+            messages.Add(CancelTaskMessage);
             ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1);
             CleanUp(testFolder);
         }
@@ -149,26 +133,12 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Build();
         }
 
-        private IConfiguration GetTaskConfigurationForBreakTask()
+        private IConfiguration GetTaskConfigurationForCancellationTask()
         {
             return TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "TaskID")
-                .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Build();
-        }
-        private IConfiguration GetTaskConfigurationForEnforceToCloseTask()
-        {
-            var taskConfig = TaskConfiguration.ConfigurationModule
-                .Set(TaskConfiguration.Identifier, "TaskID-EnforceToClose")
-                .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Build();
-
-            return TangFactory.GetTang()
-                .NewConfigurationBuilder(taskConfig)
-                .BindIntNamedParam<EnforceCloseTimeoutMilliseconds>("1000")
-                .BindNamedParameter<EnforceClose, bool>(GenericType<EnforceClose>.Class, "true")
+                .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByCancellationTask>.Class)
+                .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByCancellationTask>.Class)
                 .Build();
         }
 
@@ -321,7 +291,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             private readonly CountdownEvent _suspendSignal = new CountdownEvent(1);
 
             [Inject]
-            private CloseByReturnTestTask([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeout)
+            private CloseByReturnTestTask()
             {
             }
 
@@ -364,108 +334,69 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        /// <summary>
-        /// This is a testing task. It serves for two test cases.
-        /// In the first case, EnforceClose is false (default). When the task receives the close event, it signals the Call method
-        /// to let it continue the iteration. As _shouldCloseTask is set to 1, the Call() will return after
-        /// completing the current iteration.
-        /// In the second case, EnforceClose is set to true. When the task receives the close event, it sets
-        /// _shouldCloseTask to 1. As the task is hung in this scenario, Call() would never return.
-        ///  After waiting for _enforceCloseTimeoutMilliseconds, the close handler throws an exception, enforcing the task to stop.
-        /// </summary>
-        private sealed class CloseByBreakAndEnforceToStopTask : ITask, IObserver<ICloseEvent>
+        private sealed class CloseByCancellationTask : ITask, IObserver<ICloseEvent>
         {
-            private long _shouldCloseTask = 0;
-            private long _isTaskStopped = 0;
-            private readonly bool _enforceClose;
-            private readonly int _enforceCloseTimeoutMilliseconds;
+            /// <summary>
+            /// Task close Coordinator to handle the work when receiving task close event
+            /// </summary>
+            private readonly TaskCloseCoordinator _taskCloseCoordinator;
+
+            /// <summary>
+            /// The cancellation token to control the group communication operation cancellation
+            /// </summary>
+            private readonly CancellationTokenSource _cancellationSource;
 
-            private readonly CountdownEvent _suspendSignal1 = new CountdownEvent(1);
-            private readonly CountdownEvent _suspendSignal2 = new CountdownEvent(1);
-            private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false);
+            /// <summary>
+            /// A blocking collection to simulate a blocking data reading
+            /// </summary>
+            private readonly BlockingCollection<int> _messageQueue;
 
             [Inject]
-            private CloseByBreakAndEnforceToStopTask(
-                [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds,
-                [Parameter(typeof(EnforceClose))] bool enforceClose)
+            private CloseByCancellationTask(TaskCloseCoordinator taskCloseCoordinator)
             {
-                _enforceClose = enforceClose;
-                _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
+                _taskCloseCoordinator = taskCloseCoordinator;
+                _cancellationSource = new CancellationTokenSource();
+                _messageQueue = new BlockingCollection<int>();
             }
 
+            /// <summary>
+            /// Blocking the call until it is canceled, then signal the TaskCloseCoordinator
+            /// </summary>
+            /// <param name="memento"></param>
+            /// <returns></returns>
             public byte[] Call(byte[] memento)
             {
-                int iterate = 1;
-
-                while (Interlocked.Read(ref _shouldCloseTask) == 0 && iterate < 100)
+                try
                 {
-                    iterate++;
-                    if (_enforceClose)
-                    {
-                        _suspendSignal1.Wait();
-                    }
-                    else
-                    {
-                        _suspendSignal2.Wait();
-                    }
+                    _messageQueue.Take(_cancellationSource.Token);
                 }
-
-                Interlocked.Exchange(ref _isTaskStopped, 1);
-
-                if (Interlocked.Read(ref _shouldCloseTask) == 1)
+                catch (OperationCanceledException)
                 {
-                    Logger.Log(Level.Info, BreakTaskMessage);
-                    _waitToCloseEvent.Set();
+                    Logger.Log(Level.Info, CancelTaskMessage);
                 }
-
+                _taskCloseCoordinator.SignalTaskStopped();
                 return null;
             }
 
             public void Dispose()
             {
-                Logger.Log(Level.Info, "Task is disposed.");
             }
 
             /// <summary>
-            /// When the close event is received, it sets _shouldCloseTask to 1.
-            /// If _enforceClose is false, _suspendSignal2 is signaled to let the task to continue to run. This is to simulate that the
-            /// task is running properly and will break after completing the current iteration. It will set the _waitToCloseEvent
-            /// to let the flow in the close event handler to continue.
-            /// If _enforceClose is true,  _suspendSignal1 will be not signaled, this is to simulate that the task is hung.
-            /// After waiting for specified time, the close handler will throw exception to enforce the task to stop.
+            /// Task close handler. Call TaskCloseCoordinator to handle the event.
             /// </summary>
             /// <param name="closeEvent"></param>
             public void OnNext(ICloseEvent closeEvent)
             {
-                if (closeEvent.Value.IsPresent() && Encoding.UTF8.GetString(closeEvent.Value.Value).Equals(DisposeMessageFromDriver))
-                {
-                    Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(closeEvent.Value.Value));
-                    Interlocked.Exchange(ref _shouldCloseTask, 1);
-                    if (!_enforceClose)
-                    {
-                        _suspendSignal2.Signal();
-                    }
-
-                    _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
-
-                    if (Interlocked.Read(ref _isTaskStopped) == 0)
-                    {
-                        Logger.Log(Level.Info, EnforceToCloseMessage);
-                        throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                    }
-                }
-                else
-                {
-                    throw new Exception("Expected close event message is not received.");
-                }
+                _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
             }
 
-            public void OnCompleted()
+            public void OnError(Exception error)
             {
                 throw new NotImplementedException();
             }
 
-            public void OnError(Exception error)
+            public void OnCompleted()
             {
                 throw new NotImplementedException();
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index c3521cd..b462438 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -16,9 +16,13 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Network;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities;
@@ -30,24 +34,23 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     /// <summary>
     /// This is to test close event handler in IMRU tasks
-    /// The test provide IRunningTask, IFailedTask and ICOmpletedTask handlers so that to trigger close events and handle the 
+    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the 
     /// failed tasks and completed tasks
     /// </summary>
     [Collection("FunctionalTests")]
     public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
     {
         private const string CompletedTaskMessage = "CompletedTaskMessage";
+        private const string FailEvaluatorMessage = "FailEvaluatorMessage";
         private const string FailTaskMessage = "FailTaskMessage";
 
         /// <summary>
         /// This test is for running in local runtime
         /// It sends close event for all the running tasks.
-        /// It first informs the Call method to stop.
-        /// If Call method is running properly, it will respect to this flag and will return properly, that will end up ICompletedTask event.
-        ////If Call method is hung some where and cannot be returned, the close handler will throw exception, that would cause IFailedTask event.
-        /// As we are testing IMRU Task not a test task, the behavior is not deterministic. It can be CompletedTask or FailedTask
-        /// No matter how the task is closed, the total number of completed task and failed task should be equal to the 
-        /// total number of the tasks.
+        /// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call() 
+        /// method and driver will receive ICompletedTask.
+        /// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask. 
+        /// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator. 
         /// </summary>
         [Fact]
         public void TestTaskCloseOnLocalRuntime()
@@ -55,27 +58,21 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int chunkSize = 2;
             const int dims = 50;
             const int iterations = 200;
-            const int mapperMemory = 5120;
-            const int updateTaskMemory = 5120;
+            const int mapperMemory = 512;
+            const int updateTaskMemory = 512;
             const int numTasks = 4;
             var testFolder = DefaultRuntimeFolder + TestId;
             TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder);
-            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
-            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120);
             var completedCount = GetMessageCount(lines, CompletedTaskMessage);
-            Assert.Equal(numTasks, failedCount + completedCount);
+            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            Assert.Equal(numTasks, completedCount + failedCount);
             CleanUp(testFolder);
         }
 
         /// <summary>
         /// Same testing for running on YARN
         /// It sends close event for all the running tasks.
-        /// It first informs the Call method to stop.
-        /// If Call method is running properly, it will respect to this flag and will return properly, that will end up ICompletedTask event.
-        ////If Call method is hung some where and cannot be returned, the close handler will throw exception, that would cause IFailedTask event.
-        /// As we are testing IMRU Task not a test task, the behavior is not deterministic. It can be CompletedTask or FailedTask
-        /// No matter how the task is closed, the total number of completed task and failed task should be equal to the 
-        /// total number of the tasks.
         /// </summary>
         [Fact(Skip = "Requires Yarn")]
         public void TestTaskCloseOnLocalRuntimeOnYarn()
@@ -83,15 +80,15 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int chunkSize = 2;
             const int dims = 50;
             const int iterations = 200;
-            const int mapperMemory = 5120;
-            const int updateTaskMemory = 5120;
+            const int mapperMemory = 512;
+            const int updateTaskMemory = 512;
             const int numTasks = 4;
             TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory);
         }
 
         /// <summary>
         /// This method overrides base class method and defines its own event handlers for driver. 
-        /// It uses its own RunningTaskHandler, FailedTaskHandler and CompletedTaskHandler so that to simulate the test scenarios 
+        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios 
         /// and verify the test result. 
         /// Rest of the event handlers use those from IMRUDriver. In IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
         /// </summary>
@@ -112,7 +109,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
                     GenericType<TestHandlers>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
-                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                    GenericType<TestHandlers>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
@@ -124,43 +121,112 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
+        /// Mapper function configuration. Add TcpConfiguration to the base configuration
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            return Configurations.Merge(GetTcpConfiguration(), base.BuildMapperFunctionConfig());
+        }
+
+        /// <summary>
+        /// Update function configuration. Add TcpConfiguration to the base configuration.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            return Configurations.Merge(GetTcpConfiguration(), base.BuildUpdateFunctionConfig());
+        }
+
+        /// <summary>
+        /// Override default setting for retry policy
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "5")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
+        }
+
+        /// <summary>
         /// Test handlers
         /// </summary>
-        internal sealed class TestHandlers : IObserver<IRunningTask>, IObserver<IFailedTask>, IObserver<ICompletedTask>
+        internal sealed class TestHandlers : IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>, IObserver<IFailedEvaluator>
         {
+            private readonly ISet<IRunningTask> _runningTasks = new HashSet<IRunningTask>();
+            private readonly object _lock = new object();
+
             [Inject]
             private TestHandlers()
             {
             }
 
             /// <summary>
-            /// Log the task id and dispose the context
+            /// Add the RunningTask to _runningTasks and dispose the last received running task
             /// </summary>
             public void OnNext(IRunningTask value)
             {
-                Logger.Log(Level.Info, "Received running task, closing it" + value.Id);
-                value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, "Received running task:" + value.Id);
+                    _runningTasks.Add(value);
+                    if (_runningTasks.Count == 4)
+                    {
+                        Logger.Log(Level.Info, "Dispose running task from driver:" + value.Id);
+                        value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                        _runningTasks.Remove(value);
+                    }
+                }
             }
 
             /// <summary>
-            /// Validate the event and dispose the context
+            /// Log the task id and FailTaskMessage
+            /// Close the rest of the running tasks, then dispose the context
             /// </summary>
             /// <param name="value"></param>
             public void OnNext(IFailedTask value)
             {
-                Logger.Log(Level.Info, FailTaskMessage + value.Id);
-                var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value);
-                Assert.Contains(TaskManager.TaskKilledByDriver, failedExeption);
-                value.GetActiveContext().Value.Dispose();
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, FailTaskMessage + value.Id);
+                    CloseRunningTasks();
+                    value.GetActiveContext().Value.Dispose();
+                }
+            }
+
+            /// <summary>
+            /// No Failed Evaluator is expected
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IFailedEvaluator value)
+            {
+                throw new Exception(FailEvaluatorMessage);
             }
 
             /// <summary>
-            /// Log the task id and dispose the context
+            /// Log the task id and ICompletedTask
+            /// Close the rest of the running tasks, then dispose the context
             /// </summary>
             public void OnNext(ICompletedTask value)
             {
-                Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
-                value.ActiveContext.Dispose();
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
+                    CloseRunningTasks();
+                    value.ActiveContext.Dispose();
+                }
+            }
+
+            private void CloseRunningTasks()
+            {
+                foreach (var task in _runningTasks)
+                {
+                    Logger.Log(Level.Info, "Dispose running task from driver:" + task.Id);
+                    task.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                }
+                _runningTasks.Clear();
             }
 
             public void OnCompleted()

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 149cc9b..4923940 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -252,11 +252,11 @@ namespace Org.Apache.REEF.Tests.Functional
             }
         }
 
-        internal string[] ReadLogFile(string logFileName, string subfolder = "driver", string testFolder = DefaultRuntimeFolder)
+        internal string[] ReadLogFile(string logFileName, string subfolder = "driver", string testFolder = DefaultRuntimeFolder, int retryCount = 60)
         {
             string fileName = string.Empty;
             string[] lines = null;
-            for (int i = 0; i < 60; i++)
+            for (int i = 0; i < retryCount; i++)
             {
                 try
                 {
@@ -266,12 +266,12 @@ namespace Org.Apache.REEF.Tests.Functional
                 }
                 catch (Exception e)
                 {
-                    if (i == 59)
+                    if (i == retryCount - 1)
                     {
                         // log only last exception before failure
                         Logger.Log(Level.Verbose, e.ToString());
                     }
-                    if (i < 59)
+                    if (i < retryCount - 1)
                     {
                         Thread.Sleep(SleepTime);
                     }