You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/11/21 20:29:27 UTC

[1/2] reef git commit: [REEF-1448] Support for upstream heartbeats Adding job cancellation functionality into IMRU driver. And exposing ICancellationDetector interface in API which enables client to provide custom cancellation detection from upstream. Im

Repository: reef
Updated Branches:
  refs/heads/master 4e4e568fa -> 1fd4f2655


http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 588baf1..55679d8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -118,6 +118,7 @@ under the License.
     <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />
+    <Compile Include="Functional\IMRU\IMRUBroadcastReduceJobCancelledTest.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceWithFilePartitionDataSetTest.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceWithLocalFile.cs" />


[2/2] reef git commit: [REEF-1448] Support for upstream heartbeats Adding job cancellation functionality into IMRU driver. And exposing ICancellationDetector interface in API which enables client to provide custom cancellation detection from upstream. Im

Posted by ju...@apache.org.
[REEF-1448] Support for upstream heartbeats
Adding job cancellation functionality into IMRU driver.
And exposing ICancellationDetector interface in API which enables client to provide custom cancellation detection from upstream.
Implementation: JobLifecycleManager is registered as IjobCancelled event source.
the manager checks IsJobCancelled method of configured CancellationonDetector on each timer event and
if cancellation signal is detected JobLifecycleManager fires JobCancelled event.
IMRU driver is registered as IJobCancelled observable and if event is fired, shuts down all evaluators and moves to Fail state.
Similar to failure handling, when job is cancelled, IMRU driver fires and exception instead of retrying the job.
(there is no other way to communicate driver status)
In addition, JobLifecycleManager is making a request to RM to try to kill current job - if succeeded, this will mark job status as KILLED instead of FAILED.

Client can provide implementation of IJobCancelledDetector in job definition.
This configuration is optional, if no implementation is configured in the definition, the driver assumes there is no cancellation signal.

JIRA:
  [REEF-1448] (https://issues.apache.org/jira/browse/REEF-1448)

This closes #1161


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1fd4f265
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1fd4f265
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1fd4f265

Branch: refs/heads/master
Commit: 1fd4f2655406b7ce0f9e9b8c0d954d1941e60880
Parents: 4e4e568
Author: andrey-me <an...@microsoft.com>
Authored: Fri Oct 14 15:57:12 2016 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Mon Nov 21 12:21:54 2016 -0800

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Driver/IJobCancelled.cs |  37 +++
 .../Org.Apache.REEF.Driver.csproj               |   1 +
 .../IMRUJobDefinitionBuilderTests.cs            |  82 +++++
 .../ImruDriverCancelTests.cs                    | 300 +++++++++++++++++++
 .../JobLifecycleManagerTest.cs                  | 257 ++++++++++++++++
 .../Org.Apache.REEF.IMRU.Tests.csproj           |   7 +
 .../API/IJobCancelledDetector.cs                |  39 +++
 .../API/IMRUJobDefinition.cs                    |  12 +
 .../API/IMRUJobDefinitionBuilder.cs             |  14 +
 .../OnREEF/Client/REEFIMRUClient.cs             |   3 +-
 .../OnREEF/Driver/IJobLifeCycleManager.cs       |  33 ++
 .../OnREEF/Driver/IMRUDriver.cs                 |  76 ++++-
 .../JobCancellationDetectorAlwaysFalse.cs       |  39 +++
 .../OnREEF/Driver/JobCancelled.cs               |  39 +++
 .../OnREEF/Driver/JobLifeCycleManager.cs        | 252 ++++++++++++++++
 .../OnREEF/Parameters/SleepIntervalParameter.cs |  26 ++
 .../Org.Apache.REEF.IMRU.csproj                 |   6 +
 .../Interface/IConfigurationBuilder.cs          |   2 +-
 .../IMRU/IMRUBroadcastReduceJobCancelledTest.cs |  79 +++++
 ...oadcastReduceWithFilePartitionDataSetTest.cs |   2 +-
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |  56 +++-
 .../Functional/IMRU/TestFailMapperEvaluators.cs |   4 +-
 .../Functional/ReefFunctionalTest.cs            |   9 +-
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 24 files changed, 1353 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Driver/IJobCancelled.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/IJobCancelled.cs b/lang/cs/Org.Apache.REEF.Driver/IJobCancelled.cs
new file mode 100644
index 0000000..9fae78e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/IJobCancelled.cs
@@ -0,0 +1,37 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Driver
+{
+    /// <summary>
+    /// IJobCancelled event is raised when cancellation signal is detected.
+    /// </summary>
+    public interface IJobCancelled
+    {
+        /// <summary>
+        /// Timestamp indicates when cancellation event was generated
+        /// </summary>
+        DateTime Timestamp { get; }
+
+        /// <summary>
+        /// Message can be used to provide cancellation details (cancellation reason, error details etc).
+        /// </summary>
+        string Message { get; }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 9bebad6..09f5948 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -139,6 +139,7 @@ under the License.
     <Compile Include="Evaluator\IEvaluatorRequest.cs" />
     <Compile Include="Evaluator\IEvaluatorRequestor.cs" />
     <Compile Include="Evaluator\IFailedEvaluator.cs" />
+    <Compile Include="IJobCancelled.cs" />
     <Compile Include="Task\JavaTaskException.cs" />
     <Compile Include="IDriver.cs" />
     <Compile Include="IDriverRestarted.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU.Tests/IMRUJobDefinitionBuilderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/IMRUJobDefinitionBuilderTests.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/IMRUJobDefinitionBuilderTests.cs
new file mode 100644
index 0000000..838aa54
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/IMRUJobDefinitionBuilderTests.cs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Reflection;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    public class IMRUJobDefinitionBuilderTests
+    {
+        [Fact]
+        public void JobDefinitionBuilderCancellationConfigIsOptional()
+        {
+            var builder = CreateTestBuilder();
+            var definition = builder.Build();
+
+            var defaultConfig = typeof(IMRUJobDefinitionBuilder)
+                .GetField("EmptyConfiguration", BindingFlags.NonPublic | BindingFlags.Static)
+                .GetValue(builder) as IConfiguration;
+
+            Assert.Same(defaultConfig, definition.JobCancelSignalConfiguration);
+        }
+
+        [Fact]
+        public void JobDefinitionBuilderCancellationConfigIsSetToNull()
+        {
+            var definition = CreateTestBuilder()
+               .SetJobCancellationConfiguration(null)
+               .Build();
+
+            Assert.Null(definition.JobCancelSignalConfiguration);
+        }
+
+        [Fact]
+        public void JobDefinitionBuilderSetsJobCancellationConfig()
+        {
+            var cancelSignalConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(GenericType<IJobCancelledDetector>.Class,
+                    GenericType<JobCancellationDetectorAlwaysFalse>.Class)
+                .Build();
+
+            var definition = CreateTestBuilder()
+                .SetJobCancellationConfiguration(cancelSignalConfig)
+                .Build();
+
+            Assert.NotNull(definition.JobCancelSignalConfiguration);
+            Assert.Same(cancelSignalConfig, definition.JobCancelSignalConfiguration);
+        }
+
+        private IMRUJobDefinitionBuilder CreateTestBuilder()
+        {
+            var testConfig = TangFactory.GetTang().NewConfigurationBuilder().Build();
+
+            return new IMRUJobDefinitionBuilder()
+                .SetJobName("Test")
+                .SetMapFunctionConfiguration(testConfig)
+                .SetMapInputCodecConfiguration(testConfig)
+                .SetUpdateFunctionCodecsConfiguration(testConfig)
+                .SetReduceFunctionConfiguration(testConfig)
+                .SetUpdateFunctionConfiguration(testConfig);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
new file mode 100644
index 0000000..921c54b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Reflection;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    public class ImruDriverCancelTests
+    {
+        [Fact]
+        [Trait("Description", "Verifies that IMRU driver handles cancel signal: changes state to Fail and throw exception with predefined message.")]
+        public void ImruDriverHandlesCancelledEventAfterStart()
+        {
+            var driver = TangFactory
+                    .GetTang()
+                    .NewInjector(GetDriverConfig<TestMapInput, TestMapOutput, TestResult, TestPartitionType>())
+                    .GetInstance(typeof(IMRUDriver<TestMapInput, TestMapOutput, TestResult, TestPartitionType>))
+                as IMRUDriver<TestMapInput, TestMapOutput, TestResult, TestPartitionType>;
+
+            IDriverStarted startedEvent = null;
+            driver.OnNext(startedEvent);
+
+            var cancelMessage = "cancel_" + Guid.NewGuid();
+            var cancelTime = DateTime.Now;
+            IJobCancelled cancelledEvent = new JobCancelled(cancelTime, cancelMessage);
+
+            Assert.False(GetDriverState(driver).CurrentState == SystemState.Fail, "driver's state is Fail after Onstarted event");
+
+            AssertExceptionThrown<ApplicationException>(
+                () => driver.OnNext(cancelledEvent),
+                expectedExceptionMessageContent: new[] { "Job cancelled", cancelTime.ToString("u"), cancelMessage },
+                assertMessagePrefix: "Cancel event handler failed to throw expected exception");
+
+            var stateAfterCancel = GetDriverState(driver);
+            Assert.True(stateAfterCancel.CurrentState == SystemState.Fail, "invalid driver state after cancel event: expected= Fail, actual=" + stateAfterCancel.CurrentState);
+        }
+
+        private SystemStateMachine GetDriverState(object driver)
+        {
+            return driver.GetType()
+                .GetField("_systemState", BindingFlags.Instance | BindingFlags.NonPublic)
+                .GetValue(driver) as SystemStateMachine;
+        }
+
+        private void AssertExceptionThrown<TException>(Action ationWithException,
+            IEnumerable<string> expectedExceptionMessageContent,
+            string assertMessagePrefix)
+        {
+            try
+            {
+                ationWithException();
+                Assert.True(false, assertMessagePrefix + " action did not result in any exception");
+            }
+            catch (Exception ex)
+            {
+                Assert.True(ex is TException,
+                    string.Format("{0}:  expected exception of type: {1}", assertMessagePrefix, typeof(TException)));
+
+                var missingContent = expectedExceptionMessageContent
+                    .Where(expectedContent => !ex.Message.Contains(expectedContent));
+
+                Assert.False(
+                    missingContent.Any(),
+                    string.Format("{0}: Did not find missing content in exception message. Missing content: {1}, actual message: {2}", assertMessagePrefix, string.Join(" | ", missingContent), ex.Message));
+            }
+        }
+
+        /// <summary>
+        /// This generates empty driver configuration which can be used to construct instance of the IMRUDriver, 
+        /// but is not functional.
+        /// this is used to unit test specific code path (like JobCancelledEvent in this case)
+        /// </summary>
+        private IConfiguration GetDriverConfig<TMapInput, TMapOutput, TResult, TPartitionType>()
+        {
+            var testConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(GenericType<IPartitionedInputDataSet>.Class, GenericType<TestPartitionedInputDataSet>.Class)
+                .BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<TestEvaluatorRequestor>.Class)
+                .Build();
+
+            var jobDefinition = new IMRUJobDefinitionBuilder()
+                .SetJobName("Test")
+                .SetMapFunctionConfiguration(testConfig)
+                .SetMapInputCodecConfiguration(testConfig)
+                .SetUpdateFunctionCodecsConfiguration(testConfig)
+                .SetReduceFunctionConfiguration(testConfig)
+                .SetUpdateFunctionConfiguration(testConfig)
+                .SetPartitionedDatasetConfiguration(testConfig)
+                .Build();
+
+            var _configurationSerializer = new AvroConfigurationSerializer();
+
+            var overallPerMapConfig = Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray());
+            var driverConfig = TangFactory.GetTang().NewConfigurationBuilder(new[]
+                {
+                    DriverConfiguration.ConfigurationModule
+                        .Set(DriverConfiguration.OnEvaluatorAllocated,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnDriverStarted,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnContextActive,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnTaskCompleted,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnEvaluatorFailed,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnContextFailed,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnTaskFailed,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.OnTaskRunning,
+                            GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                        .Set(DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
+                        .Build(),
+                    TangFactory.GetTang().NewConfigurationBuilder()
+                        .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>("driverId")
+                        .BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(IMRUConstants.UpdateTaskName)
+                        .BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(
+                            IMRUConstants.CommunicationGroupName)
+                        .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(
+                            IMRUConstants.TreeFanout.ToString(CultureInfo.InvariantCulture)
+                                .ToString(CultureInfo.InvariantCulture))
+                        .BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(
+                            (jobDefinition.NumberOfMappers + 1).ToString(CultureInfo.InvariantCulture))
+                        .BindImplementation(GenericType<IGroupCommDriver>.Class, GenericType<GroupCommDriver>.Class)
+                        .Build(),
+                    jobDefinition.PartitionedDatasetConfiguration,
+                    overallPerMapConfig,
+                    jobDefinition.JobCancelSignalConfiguration
+                })
+                .BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
+                .BindNamedParameter(typeof(SerializedMapTaskStateConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.MapTaskStateConfiguration))
+                .BindNamedParameter(typeof(SerializedMapConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
+                .BindNamedParameter(typeof(SerializedUpdateConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.UpdateFunctionConfiguration))
+                .BindNamedParameter(typeof(SerializedMapInputCodecConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.MapInputCodecConfiguration))
+                .BindNamedParameter(typeof(SerializedMapInputPipelineDataConverterConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.MapInputPipelineDataConverterConfiguration))
+                .BindNamedParameter(typeof(SerializedUpdateFunctionCodecsConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.UpdateFunctionCodecsConfiguration))
+                .BindNamedParameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
+                .BindNamedParameter(typeof(SerializedReduceConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+                .BindNamedParameter(typeof(SerializedResultHandlerConfiguration),
+                    _configurationSerializer.ToString(jobDefinition.ResultHandlerConfiguration))
+                .BindNamedParameter(typeof(MemoryPerMapper),
+                    jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MemoryForUpdateTask),
+                    jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(CoresPerMapper),
+                    jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(CoresForUpdateTask),
+                    jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
+                    jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(InvokeGC),
+                    jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
+                .Build();
+                
+            return driverConfig;
+        }
+
+        internal class TestMapInput
+        {
+            [Inject]
+            private TestMapInput()
+            {
+            }
+        }
+
+        internal class TestMapOutput
+        {
+            [Inject]
+            private TestMapOutput()
+            {
+            }
+        }
+
+        internal class TestResult
+        {
+            [Inject]
+            private TestResult()
+            {
+            }
+        }
+
+        internal class TestPartitionType
+        {
+            [Inject]
+            private TestPartitionType()
+            {
+            }
+        }
+
+        /// <summary>
+        /// Simple Type to help with Tang injection when constructing IMRUDriver.
+        /// Cares minimum implementation to satisfy new driver instance for test scenarios
+        /// </summary>
+        internal class TestEvaluatorRequestor : IEvaluatorRequestor
+        {
+            public IResourceCatalog ResourceCatalog { get; private set; }
+
+            [Inject]
+            private TestEvaluatorRequestor()
+            {
+            }
+
+            public void Submit(IEvaluatorRequest request)
+            {
+                // for test we don't really submit evaluator request, 
+                // but can't throw exception here as Driver calls this method before cancellation flow can be initiated.
+            }
+
+            public EvaluatorRequestBuilder NewBuilder()
+            {
+                var builder = Activator.CreateInstance(
+                    typeof(EvaluatorRequestBuilder),
+                    nonPublic: true);
+                return builder as EvaluatorRequestBuilder;
+            }
+
+            public EvaluatorRequestBuilder NewBuilder(IEvaluatorRequest request)
+            {
+                return NewBuilder();
+            }
+        }
+
+        /// <summary>
+        /// Simple Type to help with Tang injection when constructing IMRUDriver.
+        /// Cares minimum implementation to satisfy new driver instance for test scenarios
+        /// </summary>
+        internal class TestPartitionedInputDataSet : IPartitionedInputDataSet
+        {
+            public int Count { get; private set; }
+            public string Id { get; private set; }
+
+            [Inject]
+            private TestPartitionedInputDataSet()
+            {
+            }
+
+            public IEnumerator<IPartitionDescriptor> GetEnumerator()
+            {
+                return new List<IPartitionDescriptor>().GetEnumerator();
+            }
+
+            IEnumerator IEnumerable.GetEnumerator()
+            {
+                return GetEnumerator();
+            }
+
+            public IPartitionDescriptor GetPartitionDescriptorForId(string partitionId)
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU.Tests/JobLifecycleManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/JobLifecycleManagerTest.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/JobLifecycleManagerTest.cs
new file mode 100644
index 0000000..8df82d1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/JobLifecycleManagerTest.cs
@@ -0,0 +1,257 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Threading;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Xunit;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    public class JobLifecycleManagerTest
+    {
+        [Fact]
+        [Trait("Description", "Verify that JobCancelled event is sent when cancellation signal is detected.")]
+        public void JobLifeCycleMangerSendsJobCancelledEvent()
+        {
+            string expectedMessage = "cancelled";
+            var observer = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(true, expectedMessage))
+                .FirstOrDefault();
+
+            AssertCancelEvent(observer, true, expectedMessage);
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that JobCancelled Event can be sent to all subscribers in case of multiply observers.")]
+        public void JobLifeCycleMangerSendsJobCancelledEventToMultiplyObservers()
+        {
+            string expectedMessage = "cancelled";
+            var observers = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(true, expectedMessage));
+
+            foreach (var observer in observers)
+            {
+                AssertCancelEvent(observer, true, expectedMessage);
+            }
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that IsCancelled check is performed with specified period.")]
+        public void JobLifeCycleMangerChecksDetectorPeriodically()
+        {
+            string expectedMessage = "cancelled";
+            int isCancelledCheckCounter = 0;
+
+            var observer = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(true, expectedMessage, testAction: () => { isCancelledCheckCounter++; }),
+                signalCheckPeriodSec: 1,
+                waitForEventPeriodSec: 6)
+                .FirstOrDefault();
+
+            Assert.True(isCancelledCheckCounter >= 5, "Expected 5+ IsCancelled checks in 6 sec (check interval = 1 sec). Actual check counter: " + isCancelledCheckCounter);
+            AssertCancelEvent(observer, true, expectedMessage);
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that JobLifecycle manager does not sent any cancellation events if signal is not generated.")]
+        public void JobLifeCycleMangerNoSignalDoesNotSendEvent()
+        {
+            var observer = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(false))
+                .FirstOrDefault();
+
+            AssertCancelEvent(observer, false);
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that no cancellation event is sent if configured detector is null.")]
+        public void JobLifeCycleMangerDetectorNullDoesNotSendEvent()
+        {
+            var observer = JobLifeCycleMangerEventTest(
+                detector: null)
+                .FirstOrDefault();
+
+            AssertCancelEvent(observer, false);
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that cancellation checks are not performed if there are no observers.")]
+        public void JobLifeCycleMangerNoObserversDoesNotCheckForSignal()
+        {
+            int isCancelledCheckCounter = 0;
+
+            var observer = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(true, "cancelled", testAction: () => { isCancelledCheckCounter++; }),
+                subscribeObserver: false,
+                signalCheckPeriodSec: 1,
+                waitForEventPeriodSec: 6)
+                .FirstOrDefault();
+
+            Assert.True(isCancelledCheckCounter == 0, "Expected no checks for cancellation if there are no subscribers. Actual check counter: " + isCancelledCheckCounter);
+            AssertCancelEvent(observer, false);
+        }
+
+        [Fact]
+        [Trait("Description", "Verify that manager stops checking for cancellation signal after all observers unsubscribed.")]
+        public void JobLifeCycleMangerNoCancellationChecksAfterAllObserversUnsubscribed()
+        {
+            int isCancelledCheckCounter = 0;
+
+            const int waitForEventPeriodSec = 6;
+
+            var observer = JobLifeCycleMangerEventTest(
+                detector: new SampleJobCancelledDetector(true, "cancelled", testAction: () => { isCancelledCheckCounter++; }),
+                subscribeObserver: false,
+                signalCheckPeriodSec: 1,
+                waitForEventPeriodSec: waitForEventPeriodSec)
+                .FirstOrDefault();
+
+            Assert.True(isCancelledCheckCounter == 0, "Expected no checks for cancellation if there are no subscribers. Actual check counter: " + isCancelledCheckCounter);
+
+            // subscribe observer - checks should start incrementing
+            observer.Subscribe();
+            Thread.Sleep(waitForEventPeriodSec * 1000);
+            Assert.True(isCancelledCheckCounter > 0, "Expected checks for cancellation after new subscritpion added. Actual check counter: " + isCancelledCheckCounter);
+
+            // unsubscribe and verify that checks for cancellation are not incrementing anymore
+            observer.UnSubscribe();
+            var counterAfterUnsubscribe = isCancelledCheckCounter;
+            Thread.Sleep(waitForEventPeriodSec * 1000);
+            Assert.True(isCancelledCheckCounter == counterAfterUnsubscribe, "Expected no checks for cancellation after all subscribers unsubscribed. Actual check counter: " + isCancelledCheckCounter + " expected Counter to stay at: " + counterAfterUnsubscribe);
+        }
+
+        private IEnumerable<TestObserver> JobLifeCycleMangerEventTest(
+            IJobCancelledDetector detector,
+            bool subscribeObserver = true,
+            int observerCount = 1,
+            int signalCheckPeriodSec = 1,
+            int waitForEventPeriodSec = 2)
+        {
+            var manager = Activator.CreateInstance(
+                typeof(JobLifeCycleManager),
+                BindingFlags.NonPublic | BindingFlags.Instance,
+                null,
+                new object[] { detector, signalCheckPeriodSec },
+                null,
+                null) as JobLifeCycleManager;
+
+            var observers = Enumerable.Range(1, observerCount)
+                .Select(_ => new TestObserver(manager, subscribeObserver))
+                .ToList();
+
+            Thread.Sleep(waitForEventPeriodSec * 1000);
+
+            return observers;
+        }
+
+        private void AssertCancelEvent(TestObserver observer, bool expectedEvent, string expectedMessage = null)
+        {
+            if (expectedEvent)
+            {
+                Assert.NotNull(observer.LastEvent);
+                Assert.Same(expectedMessage, observer.LastEvent.Message);
+            }
+            else
+            {
+                Assert.Null(observer.LastEvent);
+            }
+        }
+
+        private IDriverStarted NewStartedEvent()
+        {
+            // event is not really used by the driver, so can use null here
+            return null;
+        }
+
+        /// <summary>
+        /// Test helper class to provide predefined cancel signal for testing
+        /// </summary>
+        private class SampleJobCancelledDetector : IJobCancelledDetector
+        {
+            private bool _isCancelledResponse;
+            private string _cancellationMessage;
+            private Action _actionOnIsCancelledCall;
+
+            internal SampleJobCancelledDetector(bool isCancelledResponse, string expectedMessage = null, Action testAction = null)
+            {
+                _isCancelledResponse = isCancelledResponse;
+                _cancellationMessage = expectedMessage;
+                _actionOnIsCancelledCall = testAction;
+            }
+
+            public bool IsJobCancelled(out string cancellationMessage)
+            {
+                if (_actionOnIsCancelledCall != null)
+                {
+                    _actionOnIsCancelledCall();
+                }
+
+                cancellationMessage = this._cancellationMessage;
+                return _isCancelledResponse;
+            }
+        }
+
+        /// <summary>
+        /// Test helper class to record JobCancelled events from lifecycle manager
+        /// </summary>
+        private class TestObserver : IObserver<IJobCancelled> 
+        {
+            internal IJobCancelled LastEvent { get; private set; }
+            internal IObservable<IJobCancelled> source { get; private set; }
+
+            internal IDisposable subscription { get; private set; }
+
+            internal TestObserver(IObservable<IJobCancelled> eventSource, bool autoSubscribe)
+            {
+                source = eventSource;
+                if (autoSubscribe)
+                {
+                    Subscribe();
+                }
+            }
+
+            public void OnNext(IJobCancelled value)
+            {
+                LastEvent = value;
+            }
+
+            public void OnError(Exception error)
+            {
+            }
+
+            public void OnCompleted()
+            {
+            }
+
+            public void Subscribe()
+            {
+                subscription = source.Subscribe(this);
+            }
+
+            public void UnSubscribe()
+            {
+                subscription.Dispose();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 4ce072b..c31fa98 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -48,6 +48,9 @@ under the License.
     </Reference>
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="ImruDriverCancelTests.cs" />
+    <Compile Include="IMRUJobDefinitionBuilderTests.cs" />
+    <Compile Include="JobLifecycleManagerTest.cs" />
     <Compile Include="MapInputWithControlMessageTests.cs" />
     <Compile Include="NaturalSumTest.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
@@ -91,6 +94,10 @@ under the License.
       <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
       <Name>Org.Apache.REEF.Utilities</Name>
     </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+      <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project>
+      <Name>Org.Apache.REEF.IO</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/API/IJobCancelledDetector.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IJobCancelledDetector.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IJobCancelledDetector.cs
new file mode 100644
index 0000000..aacd298
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IJobCancelledDetector.cs
@@ -0,0 +1,39 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+    /// <summary>
+    /// The interface is used by IMRU driver to detect if job was cancelled. 
+    /// By default no cancellation events are implemented. 
+    /// Client can set specific implementation of the interface when creating job definition.
+    /// For instance IMRU job can be cancelled when client application is cancelled.
+    /// </summary>
+    [DefaultImplementation(typeof(JobCancellationDetectorAlwaysFalse))]
+    public interface IJobCancelledDetector
+    {
+        /// <summary>
+        /// Is used to check for cancellation signal
+        /// </summary>
+        /// <param name="cancellationMessage">details about cancellation event</param>
+        /// <returns>true if cancellation signal is detected, false - otherwise</returns>
+        bool IsJobCancelled(out string cancellationMessage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index ffec4a6..e2ffc6d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -38,6 +38,7 @@ namespace Org.Apache.REEF.IMRU.API
         private readonly IConfiguration _mapInputPipelineDataConverterConfiguration;
         private readonly IConfiguration _partitionedDatasetConfiguration;
         private readonly IConfiguration _resultHandlerConfiguration;
+        private readonly IConfiguration _jobCancelSignalConfiguration;
         private readonly int _numberOfMappers;
         private readonly int _memoryPerMapper;
         private readonly int _updateTaskMemory;
@@ -74,6 +75,7 @@ namespace Org.Apache.REEF.IMRU.API
         /// <param name="maxRetryNumberInRecovery">Max number of retries done if first run of IMRU job failed</param>
         /// <param name="jobName">Job name</param>
         /// <param name="invokeGC">Whether to call garbage collector after each iteration</param>
+        /// <param name="jobCancelSignalConfiguration">Configuration of job cancellation signal</param>
         internal IMRUJobDefinition(
             IConfiguration updateTaskStateConfiguration,
             IConfiguration mapTaskStateConfiguration,
@@ -86,6 +88,7 @@ namespace Org.Apache.REEF.IMRU.API
             IConfiguration mapInputPipelineDataConverterConfiguration,
             IConfiguration partitionedDatasetConfiguration,
             IConfiguration resultHandlerConfiguration,
+            IConfiguration jobCancelSignalConfiguration,
             ISet<IConfiguration> perMapConfigGeneratorConfig,
             int numberOfMappers,
             int memoryPerMapper,
@@ -116,6 +119,7 @@ namespace Org.Apache.REEF.IMRU.API
             _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig;
             _invokeGC = invokeGC;
             _resultHandlerConfiguration = resultHandlerConfiguration;
+            _jobCancelSignalConfiguration = jobCancelSignalConfiguration;
         }
 
         /// <summary>
@@ -282,5 +286,13 @@ namespace Org.Apache.REEF.IMRU.API
         {
             get { return _resultHandlerConfiguration; }
         }
+
+        /// <summary>
+        /// Configuration for job cancellation signal implementation
+        /// </summary>
+        internal IConfiguration JobCancelSignalConfiguration
+        {
+            get { return _jobCancelSignalConfiguration; }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index e1af1a3..bf1b75b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -50,6 +50,7 @@ namespace Org.Apache.REEF.IMRU.API
         private IConfiguration _mapInputPipelineDataConverterConfiguration;
         private IConfiguration _partitionedDatasetConfiguration;
         private IConfiguration _resultHandlerConfiguration;
+        private IConfiguration _jobCancellationConfiguration;
         private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig;
         private bool _invokeGC;
 
@@ -74,6 +75,7 @@ namespace Org.Apache.REEF.IMRU.API
             _maxRetryNumberInRecovery = 0;
             _invokeGC = true;
             _perMapConfigGeneratorConfig = new HashSet<IConfiguration>();
+            _jobCancellationConfiguration = EmptyConfiguration;
         }
 
         /// <summary>
@@ -305,6 +307,17 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Sets configuration for cancellation signal detection.
+        /// </summary>
+        /// <param name="cancelSignalConfiguration"></param>
+        /// <returns></returns>
+        public IMRUJobDefinitionBuilder SetJobCancellationConfiguration(IConfiguration cancelSignalConfiguration)
+        {
+            _jobCancellationConfiguration = cancelSignalConfiguration;
+            return this;
+        }
+
+        /// <summary>
         /// Instantiate the IMRUJobDefinition.
         /// </summary>
         /// <returns>The IMRUJobDefintion configured.</returns>
@@ -355,6 +368,7 @@ namespace Org.Apache.REEF.IMRU.API
                 _mapInputPipelineDataConverterConfiguration,
                 _partitionedDatasetConfiguration,
                 _resultHandlerConfiguration,
+                _jobCancellationConfiguration,
                 _perMapConfigGeneratorConfig,
                 _numberOfMappers,
                 _memoryPerMapper,

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 8ba132e..a39762e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -121,7 +121,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                     .BindImplementation(GenericType<IGroupCommDriver>.Class, GenericType<GroupCommDriver>.Class)
                     .Build(),
                 jobDefinition.PartitionedDatasetConfiguration,
-                overallPerMapConfig
+                overallPerMapConfig,
+                jobDefinition.JobCancelSignalConfiguration
             })
                 .BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
                     _configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IJobLifeCycleManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IJobLifeCycleManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IJobLifeCycleManager.cs
new file mode 100644
index 0000000..ac5cfe1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IJobLifeCycleManager.cs
@@ -0,0 +1,33 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// interface for job lifecycle manager.
+    /// Is used by IMRU driver to sign up for IJobCancelled events.
+    /// 
+    /// </summary>
+    [DefaultImplementation(typeof(JobLifeCycleManager))]
+    internal interface IJobLifecycleManager : IObservable<IJobCancelled>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 3284859..c82dd2d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -1,4 +1,4 @@
-\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -66,7 +66,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         IObserver<IFailedContext>,
         IObserver<IFailedTask>,
         IObserver<IRunningTask>,
-        IObserver<IEnumerable<IActiveContext>>
+        IObserver<IEnumerable<IActiveContext>>,
+        IObserver<IJobCancelled>
     {
         private static readonly Logger Logger =
             Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
@@ -82,6 +83,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
         private readonly bool _invokeGC;
         private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider;
+        private IJobCancelled _cancelEvent;
 
         /// <summary>
         /// The lock for the driver. 
@@ -124,6 +126,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// </summary>
         private int _numberOfRetries;
 
+        /// <summary>
+        /// Manages lifecycle events for driver, like JobCancelled event.
+        /// </summary>
+        private readonly List<IDisposable> _disposableResources = new List<IDisposable>();
+
         private const int DefaultMaxNumberOfRetryInRecovery = 3; 
 
         [Inject]
@@ -139,7 +146,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
             [Parameter(typeof(InvokeGC))] bool invokeGC,
             IGroupCommDriver groupCommDriver,
-            INameServer nameServer)
+            INameServer nameServer,
+            IJobLifecycleManager lifecycleManager)
         {
             _configurationManager = configurationManager;
             _groupCommDriver = groupCommDriver;
@@ -161,6 +169,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             _serviceAndContextConfigurationProvider =
                 new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet, configurationManager);
 
+            if (lifecycleManager != null)
+            {
+                var handle = lifecycleManager.Subscribe(this as IObserver<IJobCancelled>);
+                _disposableResources.Add(handle);
+            }
+            
             var msg =
                 string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.",
                     memoryPerMapper,
@@ -521,9 +535,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                             {
                                 var reason1 = _evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
                                     ? "it exceeded MaximumNumberOfEvaluatorFailures, "
-                                    : "";
-                                var reason2 = isMaster ? "master evaluator failed, " : "";
-                                Logger.Log(Level.Error, "The system is not recoverable because " +  reason1 + reason2 + " changing the system state to Fail.");
+                                    : string.Empty;
+                                var reason2 = isMaster ? "master evaluator failed, " : string.Empty;
+                                Logger.Log(Level.Error, "The system is not recoverable because " + reason1 + reason2 + " changing the system state to Fail.");
                                 _systemState.MoveNext(SystemStateEvent.NotRecoverable);
                                 FailAction();
                             }
@@ -559,6 +573,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                             break;
 
                         case SystemState.Fail:
+                            FailAction();
                             break;
 
                         default:
@@ -650,6 +665,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
         #endregion IFailedTask
 
+        public void OnNext(IJobCancelled value)
+        {
+            lock (_lock)
+            {
+                _cancelEvent = value;
+                _systemState.MoveNext(SystemStateEvent.NotRecoverable);
+                FailAction();
+            }
+        }
+
         public void OnError(Exception error)
         {
         }
@@ -705,6 +730,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             ShutDownAllEvaluators();
             Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
+            DisposeResources();
         }
 
         /// <summary>
@@ -713,10 +739,40 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private void FailAction()
         {
             ShutDownAllEvaluators();
-            var msg = string.Format(CultureInfo.InvariantCulture,
-                "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.",
-                FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers(), _evaluatorManager.IsMasterEvaluatorFailed());
-            Exceptions.Throw(new ApplicationException(msg), Logger);
+            
+            var failMessage = _cancelEvent != null
+                    ? string.Format(CultureInfo.InvariantCulture,
+                        "{0} Job cancelled at {1}. cancellation message: {2}",
+                        FailActionPrefix, _cancelEvent.Timestamp.ToString("u"), _cancelEvent.Message)
+                    : string.Format(CultureInfo.InvariantCulture,
+                        "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.",
+                        FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers(), _evaluatorManager.IsMasterEvaluatorFailed());
+
+            DisposeResources();
+            Exceptions.Throw(new ApplicationException(failMessage), Logger);
+        }
+
+        /// <summary>
+        /// Dispose resources
+        /// </summary>
+        private void DisposeResources()
+        {
+            lock (_disposableResources)
+            {
+                _disposableResources.ForEach(handle =>
+                {
+                    try
+                    {
+                        handle.Dispose();
+                    }
+                    catch (Exception ex)
+                    {
+                        Logger.Log(Level.Error, "Failed to dispose a resource: {0}", ex);
+                    }
+                });
+
+                _disposableResources.Clear();
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancellationDetectorAlwaysFalse.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancellationDetectorAlwaysFalse.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancellationDetectorAlwaysFalse.cs
new file mode 100644
index 0000000..1d144af
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancellationDetectorAlwaysFalse.cs
@@ -0,0 +1,39 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Default implementation of IJobCancelledDetector. IsJobCancelled returns false always.
+    /// </summary>
+    internal sealed class JobCancellationDetectorAlwaysFalse : IJobCancelledDetector
+    {
+        [Inject]
+        public JobCancellationDetectorAlwaysFalse()
+        {
+        }
+
+        public bool IsJobCancelled(out string message)
+        {
+            message = null;
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancelled.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancelled.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancelled.cs
new file mode 100644
index 0000000..6b13ae9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobCancelled.cs
@@ -0,0 +1,39 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Driver;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Job cancelled event.
+    /// Wraps timestamp and cancellation message.
+    /// </summary>
+    internal sealed class JobCancelled : IJobCancelled
+    {
+        internal JobCancelled(DateTime timestamp, string message)
+        {
+            Timestamp = timestamp;
+            Message = message;
+        }
+
+        public DateTime Timestamp { get; private set; }
+
+        public string Message { get; private set; }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobLifeCycleManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobLifeCycleManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobLifeCycleManager.cs
new file mode 100644
index 0000000..599e6d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/JobLifeCycleManager.cs
@@ -0,0 +1,252 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Timers;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// JobLifecycleManager orchestrates job cancellation flow.
+    /// If job cancellation detector is configured in job definition, the manager starts a timer and periodically checks for cancellation signal.
+    /// if cancellation signal is detected, the manager creates JobCancelled event and propagates the event to all subscribers.
+    /// the manager is used by IMRU driver to enable job cancellation based on the jobCancellationConfiguration in job definition.
+    /// </summary>
+    internal sealed class JobLifeCycleManager :
+        IDisposable,
+        IJobLifecycleManager
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(JobLifeCycleManager));
+
+        private Timer _timer;
+        private readonly int _timerIntervalSec;
+        private readonly IJobCancelledDetector _cancellationDetector;
+        private readonly object _disposeLock = new object();
+        private readonly List<IObserver<IJobCancelled>> _observers = new List<IObserver<IJobCancelled>>();
+
+        [Inject]
+        private JobLifeCycleManager(
+            IJobCancelledDetector cancelletionDetector,
+            [Parameter(typeof(SleepIntervalParameter))] int sleepIntervalSec)
+        {
+            _cancellationDetector = cancelletionDetector;
+            _timerIntervalSec = sleepIntervalSec;
+            InitTimer();
+        }
+
+        ~JobLifeCycleManager()
+        {
+            Dispose();
+        }
+
+        private void InitTimer()
+        {
+            if (_cancellationDetector == null)
+            {
+                Logger.Log(Level.Info, "Cancellation detector is null - no need to start Timer for job lifecycle manager");
+                return;
+            }
+
+            if (_cancellationDetector is JobCancellationDetectorAlwaysFalse)
+            {
+                Logger.Log(Level.Info, "Cancellation detector is default - no need to start Timer for job lifecycle manager.");
+                return;
+            }
+
+            if (_timerIntervalSec <= 0)
+            {
+                Logger.Log(Level.Info, "Timer interval ({0}) is not positive - can't start Timer for job lifecycle manager.", _timerIntervalSec);
+                return;
+            }
+    
+            Logger.Log(Level.Info, "initializing timer to monitor job status. _timer interval: {0}, cancellation detector: {1}", _timerIntervalSec, _cancellationDetector);
+
+            // start timer to monitor cancellation signal
+            _timer = new Timer(_timerIntervalSec * 1000);
+            _timer.Elapsed += OnTimer;
+            _timer.AutoReset = true;
+        }
+
+        public IDisposable Subscribe(IObserver<IJobCancelled> observer)
+        {
+            Logger.Log(Level.Info, "Adding subscriber: {0}", observer);
+
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            lock (_observers)
+            {
+                if (!_observers.Contains(observer))
+                {
+                    _observers.Add(observer);
+                    EnsureTimerStarted();
+                }
+            }
+
+            return new AnonymousDisposable(() => Unsubscribe(observer));
+        }
+
+        public void Dispose()
+        {
+            if (_timer != null)
+            {
+                lock (_disposeLock)
+                {
+                    if (_timer != null)
+                    {
+                        _timer.Stop();
+                        _timer.Dispose();
+                        _timer = null;
+                    }
+                }
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void Unsubscribe(IObserver<IJobCancelled> observer)
+        {
+            Logger.Log(Level.Info, "Removing subscriber: {0}", observer);
+
+            lock (_observers)
+            {
+                _observers.Remove(observer);
+                if (!_observers.Any())
+                {
+                    EnsureTimerStopped();
+                }
+            }
+        }
+
+        private void EnsureTimerStarted()
+        {
+            Logger.Log(Level.Info, "Ensure Timer STARTED. Current timer enabled state: {0}", GetTimerEnabledState());
+
+            // _timer can be null if initialization detected it will not work as expected - corresponding logs created
+            // or after dispose, in both case ignore the timer start.
+            if (_timer != null && !_timer.Enabled)
+            {
+                _timer.Start();
+                Logger.Log(Level.Info, "Timer started");
+            }
+        }
+
+        private string GetTimerEnabledState()
+        {
+            return _timer == null ? "timer is null" : _timer.Enabled.ToString();
+        }
+
+        private void EnsureTimerStopped()
+        {
+            Logger.Log(Level.Info, "Ensure Timer STOPPED. Current timer enabled state: {0}", GetTimerEnabledState());
+
+            if (_timer != null)
+            {
+                _timer.Stop();
+                Logger.Log(Level.Info, "Timer stopped");
+            }
+        }
+
+        private void OnTimer(object source, ElapsedEventArgs e)
+        {
+            lock (_observers)
+            {
+                if (!_observers.Any())
+                {
+                    Logger.Log(Level.Info,
+                        "There are no observers for cancellation event: skipping cancellation detection");
+                    return;
+                }
+            }
+
+            string cancellationMessage = null;
+            if (IsJobCancelled(out cancellationMessage))
+            {
+                Logger.Log(
+                    Level.Info, 
+                    "Detected Job cancellation ({0}): sending JobCancelled event to observers: {1}", 
+                    cancellationMessage, 
+                    ToCsvString(_observers));
+
+                var cancelEvent = new JobCancelled(DateTime.Now, cancellationMessage);
+
+                lock (_observers)
+                {
+                    _observers.ForEach(o => o.OnNext(cancelEvent));
+                }
+            }
+        }
+
+        private static string ToCsvString<T>(IEnumerable<T> list)
+        {
+            if (list == null)
+            {
+                return "null";
+            }
+
+            return string.Join(",", list.Take(10).Select(m => m == null ? "null" : m.ToString()));
+        }
+
+        private bool IsJobCancelled(out string cancellationMessage)
+        {
+            var isCancelled = false;
+            cancellationMessage = null;
+
+            try
+            {
+                isCancelled = _cancellationDetector != null && _cancellationDetector.IsJobCancelled(out cancellationMessage);
+            }
+            catch (Exception ex)
+            {
+                Logger.Log(Level.Error, "IsCancelled check failed. Exception:{0}", ex);
+                isCancelled = false;
+            }
+
+            return isCancelled;
+        }
+
+        private class AnonymousDisposable : IDisposable
+        {
+            private Action DisposeAction { get; set; }
+
+            public AnonymousDisposable(Action disposeAction)
+            {
+                DisposeAction = disposeAction;
+            }
+
+            public void Dispose()
+            {
+                DisposeAction();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SleepIntervalParameter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SleepIntervalParameter.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SleepIntervalParameter.cs
new file mode 100644
index 0000000..41bae30
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SleepIntervalParameter.cs
@@ -0,0 +1,26 @@
+\ufeff// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Sleep interval for Job lifecycle manager", "lifecyclesleepintervalSec", "15")]
+    public sealed class SleepIntervalParameter : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index f4da45f..f6161d7 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -46,6 +46,7 @@ under the License.
   <ItemGroup>
     <Compile Include="API\IIMRUClient.cs" />
     <Compile Include="API\IIMRUResultHandler.cs" />
+    <Compile Include="API\IJobCancelledDetector.cs" />
     <Compile Include="API\IMapFunction.cs" />
     <Compile Include="API\IMRUCodecConfiguration.cs" />
     <Compile Include="API\IMRUPipelineDataConverterConfiguration.cs" />
@@ -57,6 +58,7 @@ under the License.
     <Compile Include="API\IMRUUpdateConfiguration.cs" />
     <Compile Include="API\IUpdateFunction.cs" />
     <Compile Include="API\IPerMapperConfigGenerator.cs" />
+    <Compile Include="OnREEF\Driver\JobCancellationDetectorAlwaysFalse.cs" />
     <Compile Include="API\PerMapConfigGeneratorSet.cs" />
     <Compile Include="API\UpdateResult.cs" />
     <Compile Include="InProcess\IMRURunner.cs" />
@@ -74,6 +76,9 @@ under the License.
     <Compile Include="OnREEF\Driver\DataLoadingContext.cs" />
     <Compile Include="OnREEF\Driver\EvaluatorManager.cs" />
     <Compile Include="OnREEF\Driver\EvaluatorSpecification.cs" />
+    <Compile Include="OnREEF\Driver\IJobLifeCycleManager.cs" />
+    <Compile Include="OnREEF\Driver\JobCancelled.cs" />
+    <Compile Include="OnREEF\Driver\JobLifeCycleManager.cs" />
     <Compile Include="OnREEF\Driver\MaximumNumberOfEvalutorFailuresExceededException.cs" />
     <Compile Include="OnREEF\Driver\IMRUSystemException.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
@@ -120,6 +125,7 @@ under the License.
     <Compile Include="OnREEF\Parameters\SerializedUpdateConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedUpdateTaskStateConfiguration.cs" />
+    <Compile Include="OnREEF\Parameters\SleepIntervalParameter.cs" />
     <Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
     <Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
     <Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tang/Interface/IConfigurationBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Interface/IConfigurationBuilder.cs b/lang/cs/Org.Apache.REEF.Tang/Interface/IConfigurationBuilder.cs
index 5b80dfa..221f1ea 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Interface/IConfigurationBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Interface/IConfigurationBuilder.cs
@@ -22,7 +22,7 @@ namespace Org.Apache.REEF.Tang.Interface
 {
     public interface IConfigurationBuilder
     {
-        void AddConfiguration(IConfiguration c); 
+        void AddConfiguration(IConfiguration c);
         IClassHierarchy GetClassHierarchy();
         IConfiguration Build();
         void Bind(string iface, string impl);

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceJobCancelledTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceJobCancelledTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceJobCancelledTest.cs
new file mode 100644
index 0000000..d4940c2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceJobCancelledTest.cs
@@ -0,0 +1,79 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Diagnostics;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class IMRUBroadcastReduceJobCancelledTest : IMRUBrodcastReduceTestBase
+    {
+        /// <summary>
+        /// This test is for the cancellation scenario for IMRUDriver on local runtime
+        /// </summary>
+        [Fact]
+        public void TestBroadcastReduceJobWithJobCancellation()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 30;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 14;
+            int numberOfRetryInRecovery = 4;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, testFolder, numberOfChecksBeforeCancellingJob: 0);
+            ValidateSuccessForLocalRuntime(null, 0, 0, testFolder, expectedCancellationMessage: "cancellation message: IsCancelled check count: 1");
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString())
+                .Build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
index a7a067f..c68517c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
@@ -84,7 +84,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <summary>
         /// This method overrides base class method to pass IEnumerable<Row> as TPartitionType
         /// </summary>
-        protected new void TestBroadCastAndReduce(bool runOnYarn,
+        protected void TestBroadCastAndReduce(bool runOnYarn,
             int numTasks,
             int chunkSize,
             int dims,

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index 8964836..dae056f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Diagnostics;
 using System.Globalization;
 using System.IO;
 using System.Linq;
@@ -80,6 +81,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="numberOfRetryInRecovery"></param>
         /// <param name="updateTaskMemory"></param>
         /// <param name="testFolder"></param>
+        /// <param name="numberOfChecksBeforeCancellingJob"></param>
         protected void TestBroadCastAndReduce(bool runOnYarn,
             int numTasks,
             int chunkSize,
@@ -88,11 +90,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int mapperMemory,
             int updateTaskMemory,
             int numberOfRetryInRecovery = 0,
-            string testFolder = DefaultRuntimeFolder)
+            string testFolder = DefaultRuntimeFolder,
+            int? numberOfChecksBeforeCancellingJob = null)
         {
             string runPlatform = runOnYarn ? "yarn" : "local";
             TestRun(DriverConfiguration<int[], int[], int[], Stream>(
-                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery),
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, numberOfChecksBeforeCancellingJob),
                 DriverEventHandlerConfigurations<int[], int[], int[], Stream>()),
                 typeof(BroadcastReduceDriver),
                 numTasks,
@@ -134,7 +137,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>(jobDefinition.NumberOfMappers + 1,
                     driverId),
                 jobDefinition.PartitionedDatasetConfiguration,
-                overallPerMapConfig
+                overallPerMapConfig,
+                jobDefinition.JobCancelSignalConfiguration
             })
                 .BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
                     configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
@@ -206,6 +210,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="mapperMemory"></param>
         /// <param name="updateTaskMemory"></param>
         /// <param name="numberOfRetryInRecovery"></param>
+        /// <param name="numberOfChecksBeforeCancellingJob"></param>
         /// <returns></returns>
         protected virtual IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers,
             int chunkSize,
@@ -213,9 +218,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int dim,
             int mapperMemory,
             int updateTaskMemory,
-            int numberOfRetryInRecovery)
+            int numberOfRetryInRecovery,
+            int? numberOfChecksBeforeCancellingJob = null)
         {
-            return new IMRUJobDefinitionBuilder()
+            var builder = new IMRUJobDefinitionBuilder()
                 .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
                 .SetUpdateFunctionConfiguration(BuildUpdateFunctionConfiguration(numberofMappers, numIterations, dim))
                 .SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
@@ -228,8 +234,20 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .SetNumberOfMappers(numberofMappers)
                 .SetMapperMemory(mapperMemory)
                 .SetUpdateTaskMemory(updateTaskMemory)
-                .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery)
-                .Build();
+                .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery);
+
+            if (numberOfChecksBeforeCancellingJob.HasValue)
+            {
+                var cancelConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindImplementation(GenericType<IJobCancelledDetector>.Class, GenericType<JobCancellationDetectoBasedOnCheckCount>.Class)
+                    .BindNamedParameter(typeof(JobCancellationDetectoBasedOnCheckCount.NumberOfChecksBeforeCancelling), numberOfChecksBeforeCancellingJob.Value.ToString())
+                    .BindNamedParameter(typeof(SleepIntervalParameter), "1")
+                    .Build();
+
+                builder.SetJobCancellationConfiguration(cancelConfig);
+            }
+
+            return builder.Build();
         }
 
         /// <summary>
@@ -383,5 +401,29 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 Logger.Log(Level.Info, RunningTaskMessage + " " + value.Id + " " + value.ActiveContext.EvaluatorId);
             }
         }
+
+        private sealed class JobCancellationDetectoBasedOnCheckCount : IJobCancelledDetector
+        {
+            private int _checkCount = 0;
+            private int _numberOfChecksBeforeCancelling;
+
+            [Inject]
+            JobCancellationDetectoBasedOnCheckCount([Parameter(typeof(NumberOfChecksBeforeCancelling))] int numberOfChecksBeforeCancelling)
+            {
+                _numberOfChecksBeforeCancelling = numberOfChecksBeforeCancelling;
+            }
+
+            public bool IsJobCancelled(out string cancellationMessage)
+            {
+                _checkCount++;
+                cancellationMessage = "IsCancelled check count: " + _checkCount;
+                return _checkCount > _numberOfChecksBeforeCancelling;
+            }
+
+            [NamedParameter("Number of IsCancelled checks before cancelling job", "numberOfChecksBeforeCancelling", "0")]
+            internal sealed class NumberOfChecksBeforeCancelling : Name<int>
+            {
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index 1fb08ad..ebeb2c6 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -135,6 +135,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="mapperMemory"></param>
         /// <param name="updateTaskMemory"></param>
         /// <param name="numberOfRetryInRecovery"></param>
+        /// <param name="numberOfChecksBeforeCancellingJob"></param>
         /// <returns></returns>
         protected override IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers,
             int chunkSize,
@@ -142,7 +143,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int dim,
             int mapperMemory,
             int updateTaskMemory,
-            int numberOfRetryInRecovery)
+            int numberOfRetryInRecovery,
+            int? numberOfChecksBeforeCancellingJob = null)
         {
             return new IMRUJobDefinitionBuilder()
                 .SetUpdateTaskStateConfiguration(UpdateTaskStateConfiguration())

http://git-wip-us.apache.org/repos/asf/reef/blob/1fd4f265/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 6cd848f..53d9e7b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -155,7 +155,7 @@ namespace Org.Apache.REEF.Tests.Functional
             CleanUp();
         }
 
-        protected void ValidateSuccessForLocalRuntime(int numberOfContextsToClose, int numberOfTasksToFail = 0, int numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder, int retryCount = 60)
+        protected void ValidateSuccessForLocalRuntime(int? numberOfContextsToClose, int numberOfTasksToFail = 0, int numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder, int retryCount = 60, string expectedCancellationMessage = null)
         {
             const string successIndication = "EXIT: ActiveContextClr2Java::Close";
             const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
@@ -166,12 +166,17 @@ namespace Org.Apache.REEF.Tests.Functional
             string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray();
             string[] failedTaskIndicators = lines.Where(s => s.Contains(failedTaskIndication)).ToArray();
             string[] failedEvaluatorIndicators = lines.Where(s => s.Contains(failedEvaluatorIndication)).ToArray();
-            Assert.True(numberOfContextsToClose == successIndicators.Length,
+            Assert.True(!numberOfContextsToClose.HasValue || numberOfContextsToClose == successIndicators.Length,
                 "Expected number of contexts to close (" + numberOfContextsToClose + ") differs from actual number of success indicators (" + successIndicators.Length + ")");
             Assert.True(numberOfTasksToFail == failedTaskIndicators.Length,
                 "Expected number of tasks to fail (" + numberOfTasksToFail + ") differs from actual number of failed task indicators (" + failedTaskIndicators.Length + ")");
             Assert.True(numberOfEvaluatorsToFail == failedEvaluatorIndicators.Length,
                 "Expected number of evaluators to fail (" + numberOfEvaluatorsToFail + ") differs from actual number of failed evaluator indicators (" + failedEvaluatorIndicators.Length + ")");
+
+            if (!string.IsNullOrWhiteSpace(expectedCancellationMessage))
+            {
+                Assert.True(lines.Any(line => line.Contains(expectedCancellationMessage)), "Did not find job cancellation message in log file. Expected message: " + expectedCancellationMessage);
+            }
         }
 
         /// <summary>