You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/09/28 00:07:35 UTC
reef git commit: [REEF-1451] Clean up IMRU Fault Tolerant scenario
tests
Repository: reef
Updated Branches:
refs/heads/master 1add8399e -> 3ea89be97
[REEF-1451] Clean up IMRU Fault Tolerant scenario tests
This change:
* extends failures simulated with FailureType to include dispose.
* adds tests for failure of mapper task on init and on dispose
and failure of mapper evaluator on dispose.
* cleans up checks and comments in other IMRU FT scenario tests.
JIRA:
[REEF-1451](https://issues.apache.org/jira/browse/REEF-1451)
Pull request:
This closes #1132
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3ea89be9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3ea89be9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3ea89be9
Branch: refs/heads/master
Commit: 3ea89be97c20a440bba84c8c33338ee8e480875d
Parents: 1add839
Author: Mariia Mykhailova <ma...@apache.org>
Authored: Mon Sep 26 11:53:29 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Sep 27 17:06:11 2016 -0700
----------------------------------------------------------------------
.../FaultTolerantPipelinedBroadcastAndReduce.cs | 24 +++--
.../OnREEF/Driver/IMRUDriver.cs | 9 +-
.../Functional/IMRU/TestFailMapperEvaluators.cs | 7 +-
.../IMRU/TestFailMapperEvaluatorsOnDispose.cs | 91 +++++++++++++++++++
.../IMRU/TestFailMapperEvaluatorsOnInit.cs | 17 ++--
.../Functional/IMRU/TestFailMapperTasks.cs | 11 ++-
.../IMRU/TestFailMapperTasksOnDispose.cs | 90 +++++++++++++++++++
.../IMRU/TestFailMapperTasksOnInit.cs | 95 ++++++++++++++++++++
.../Functional/IMRU/TestFailUpdateEvaluator.cs | 5 ++
.../Org.Apache.REEF.Tests.csproj | 3 +
10 files changed, 330 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
index f0e563d..1b7b15f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
@@ -87,15 +87,18 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
[NamedParameter(Documentation = "Type of failure to simulate")]
internal class FailureType : Name<int>
{
- internal static readonly int EvaluatorFailureDuringTaskExecution = 0;
- internal static readonly int TaskFailureDuringTaskExecution = 1;
- internal static readonly int EvaluatorFailureDuringTaskInitialization = 2;
- internal static readonly int TaskFailureDuringTaskInitialization = 3;
+ internal const int EvaluatorFailureDuringTaskExecution = 0;
+ internal const int TaskFailureDuringTaskExecution = 1;
+ internal const int EvaluatorFailureDuringTaskInitialization = 2;
+ internal const int TaskFailureDuringTaskInitialization = 3;
+ internal const int EvaluatorFailureDuringTaskDispose = 4;
+ internal const int TaskFailureDuringTaskDispose = 5;
internal static bool IsEvaluatorFailure(int failureType)
{
return failureType == EvaluatorFailureDuringTaskExecution ||
- failureType == EvaluatorFailureDuringTaskInitialization;
+ failureType == EvaluatorFailureDuringTaskInitialization ||
+ failureType == EvaluatorFailureDuringTaskDispose;
}
}
@@ -107,7 +110,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// The function is to simulate Evaluator/Task failure for mapper evaluator
/// </summary>
- internal sealed class TestSenderMapFunction : IMapFunction<int[], int[]>
+ internal sealed class TestSenderMapFunction : IMapFunction<int[], int[]>, IDisposable
{
private int _iterations;
private readonly string _taskId;
@@ -180,6 +183,15 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
return mapInput;
}
+ public void Dispose()
+ {
+ if (_failureType == FailureType.EvaluatorFailureDuringTaskDispose ||
+ _failureType == FailureType.TaskFailureDuringTaskDispose)
+ {
+ SimulateFailure(_iterations);
+ }
+ }
+
private void SimulateFailure(int onIteration)
{
if (_iterations == onIteration &&
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index dafba71..50424c7 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -71,6 +71,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private static readonly Logger Logger =
Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
+ internal const string DoneActionPrefix = "DoneAction:";
+ internal const string FailActionPrefix = "FailAction:";
+
private readonly ConfigurationManager _configurationManager;
private readonly int _totalMappers;
private readonly IGroupCommDriver _groupCommDriver;
@@ -699,7 +702,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private void DoneAction()
{
ShutDownAllEvaluators();
- Logger.Log(Level.Info, "DoneAction done in retry {0}!!!", _numberOfRetries);
+ Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
}
/// <summary>
@@ -709,8 +712,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
ShutDownAllEvaluators();
var msg = string.Format(CultureInfo.InvariantCulture,
- "The system cannot be recovered after {0} retries. NumberofFailedMappers in the last try is {1}.",
- _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
+ "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}.",
+ FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
Exceptions.Throw(new ApplicationException(msg), Logger);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index a172908..5e53f5d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -38,8 +38,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
protected const int NumberOfRetry = 3;
/// <summary>
- /// This test is to fail one evaluator and then try to resubmit. In the last retry,
- /// there will be no failed evaluator and all tasks will be successfully completed.
+ /// This test fails two evaluators during task execution stage on each retry except last.
+ /// Job is retried until success.
/// </summary>
[Fact]
public virtual void TestFailedMapperOnLocalRuntime()
@@ -65,11 +65,14 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
// on each try each task should fail or complete or disappear with failed evaluator
// and on each try all tasks should start successfully
Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+ // eventually job succeeds
+ Assert.Equal(1, jobSuccess);
CleanUp(testFolder);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
new file mode 100644
index 0000000..c3e6657
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnDispose.cs
@@ -0,0 +1,91 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+ [Collection("FunctionalTests")]
+ public sealed class TestFailMapperEvaluatorsOnDispose : TestFailMapperEvaluators
+ {
+ /// <summary>
+ /// This test fails two evaluators during task dispose stage.
+ /// The failures are ignored, because tasks are already completed successfully.
+ /// </summary>
+ [Fact]
+ public override void TestFailedMapperOnLocalRuntime()
+ {
+ int chunkSize = 2;
+ int dims = 100;
+ int iterations = 200;
+ int mapperMemory = 5120;
+ int updateTaskMemory = 5120;
+ int numTasks = 9;
+ string testFolder = DefaultRuntimeFolder + TestId;
+ TestBroadCastAndReduce(false,
+ numTasks,
+ chunkSize,
+ dims,
+ iterations,
+ mapperMemory,
+ updateTaskMemory,
+ NumberOfRetry,
+ testFolder);
+ string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360);
+ var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+ var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+ var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+ // In first retry, all tasks are completed and then there are 2 failed evaluators.
+ // No failed tasks.
+ Assert.Equal(2, failedEvaluatorCount);
+ Assert.Equal(0, failedTaskCount);
+ Assert.Equal(numTasks, completedTaskCount);
+
+ // eventually job succeeds
+ Assert.Equal(1, jobSuccess);
+ CleanUp(testFolder);
+ }
+
+ protected override IConfiguration BuildMapperFunctionConfig()
+ {
+ var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+ .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+ GenericType<TestSenderMapFunction>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewConfigurationBuilder(c)
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+ .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskDispose.ToString())
+ .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+ .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+ .Build();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index 2dfd593..5827e80 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -17,6 +17,7 @@
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
@@ -29,11 +30,11 @@ using Xunit;
namespace Org.Apache.REEF.Tests.Functional.IMRU
{
[Collection("FunctionalTests")]
- public class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
+ public sealed class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
{
/// <summary>
- /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
- /// and no retries will be done.
+ /// This test fails two evaluators during task initialize stage on each retry except last.
+ /// Job is retried until success.
/// </summary>
[Fact]
public override void TestFailedMapperOnLocalRuntime()
@@ -58,20 +59,20 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
// In each retry, there are 2 failed evaluators.
- // The running tasks should receive cancellation and return properly. There will be no failed task.
+ // There will be no failed task.
// Rest of the tasks should be canceled and send completed task event to the driver.
Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount);
Assert.Equal(0, failedTaskCount);
Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+
+ // eventually job succeeds
+ Assert.Equal(1, jobSuccess);
CleanUp(testFolder);
}
- /// <summary>
- /// Mapper function configuration. Subclass can override it to have its own test function.
- /// </summary>
- /// <returns></returns>
protected override IConfiguration BuildMapperFunctionConfig()
{
var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
index 7fec0b7..5915c52 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -17,6 +17,7 @@
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
@@ -29,11 +30,11 @@ using Xunit;
namespace Org.Apache.REEF.Tests.Functional.IMRU
{
[Collection("FunctionalTests")]
- public class TestFailMapperTasks : TestFailMapperEvaluators
+ public sealed class TestFailMapperTasks : TestFailMapperEvaluators
{
/// <summary>
- /// This test is to throw exceptions in two tasks. In the first try, there is task app failure,
- /// and no retries will be done.
+ /// This test throws exception in two tasks during task execution stage.
+ /// This is classified as task app failure, so no retries are done, and job fails.
/// </summary>
[Fact]
public override void TestFailedMapperOnLocalRuntime()
@@ -59,6 +60,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.FailActionPrefix);
// each task should fail or complete
// there should be no failed evaluators
@@ -67,6 +69,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
Assert.Equal(numTasks, completedTaskCount + failedTaskCount);
Assert.Equal(0, failedEvaluatorCount);
Assert.Equal(numTasks, runningTaskCount);
+
+ // job fails
+ Assert.True(jobFailure > 0);
CleanUp(testFolder);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
new file mode 100644
index 0000000..e5956a7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
@@ -0,0 +1,90 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+ [Collection("FunctionalTests")]
+ public sealed class TestFailMapperTasksOnDispose : TestFailMapperEvaluators
+ {
+ /// <summary>
+ /// This test fails two tasks during task dispose stage.
+ /// The failures are ignored on core REEF layer, so no failed task events are received.
+ /// </summary>
+ [Fact]
+ public override void TestFailedMapperOnLocalRuntime()
+ {
+ int chunkSize = 2;
+ int dims = 100;
+ int iterations = 200;
+ int mapperMemory = 5120;
+ int updateTaskMemory = 5120;
+ int numTasks = 9;
+ string testFolder = DefaultRuntimeFolder + TestId;
+ TestBroadCastAndReduce(false,
+ numTasks,
+ chunkSize,
+ dims,
+ iterations,
+ mapperMemory,
+ updateTaskMemory,
+ NumberOfRetry,
+ testFolder);
+ string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360);
+ var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+ var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+ var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+ // No failed evaluators or tasks.
+ Assert.Equal(0, failedEvaluatorCount);
+ Assert.Equal(0, failedTaskCount);
+ Assert.Equal(numTasks, completedTaskCount);
+
+ // eventually job succeeds
+ Assert.Equal(1, jobSuccess);
+ CleanUp(testFolder);
+ }
+
+ protected override IConfiguration BuildMapperFunctionConfig()
+ {
+ var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+ .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+ GenericType<TestSenderMapFunction>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewConfigurationBuilder(c)
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+ .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskDispose.ToString())
+ .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+ .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+ .Build();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
new file mode 100644
index 0000000..3e7f430
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
@@ -0,0 +1,95 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+ [Collection("FunctionalTests")]
+ public sealed class TestFailMapperTasksOnInit : TestFailMapperEvaluators
+ {
+ /// <summary>
+ /// This test throws exception in two tasks during task initialization stage.
+ /// Current exception handling code can't distinguish this from communication failure, so job is retried.
+ /// </summary>
+ [Fact]
+ public override void TestFailedMapperOnLocalRuntime()
+ {
+ int chunkSize = 2;
+ int dims = 100;
+ int iterations = 200;
+ int mapperMemory = 5120;
+ int updateTaskMemory = 5120;
+ int numTasks = 9;
+ string testFolder = DefaultRuntimeFolder + TestId;
+ TestBroadCastAndReduce(false,
+ numTasks,
+ chunkSize,
+ dims,
+ iterations,
+ mapperMemory,
+ updateTaskMemory,
+ NumberOfRetry,
+ testFolder);
+ string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
+ var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask");
+ var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
+ var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix);
+
+ // In each retry, there are 2 failed tasks.
+ // Rest of the tasks should be canceled and send completed task event to the driver.
+ Assert.Equal(0, failedEvaluatorCount);
+ Assert.Equal(NumberOfRetry * 2, failedTaskCount);
+ Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+
+ // eventually job succeeds
+ Assert.Equal(1, jobSuccess);
+ CleanUp(testFolder);
+ }
+
+ /// <summary>
+ /// Mapper function configuration. Subclass can override it to have its own test function.
+ /// </summary>
+ /// <returns></returns>
+ protected override IConfiguration BuildMapperFunctionConfig()
+ {
+ var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+ .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+ GenericType<TestSenderMapFunction>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewConfigurationBuilder(c)
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+ .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+ .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskInitialization.ToString())
+ .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString())
+ .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
+ .Build();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
index cf16e25..3bd3ee0 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
@@ -36,6 +36,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
public class TestFailUpdateEvaluator : IMRUBrodcastReduceTestBase
{
private const int NumberOfRetry = 3;
+ protected const string FailActionMessage = "The system cannot be recovered after";
/// <summary>
/// This test is to fail update evaluator and then try to resubmit. We don't recover from update evaluator failure.
@@ -64,6 +65,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage);
var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+ var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.FailActionPrefix);
// there should be one try with each task either completing or disappearing with failed evaluator
// no task failures
@@ -71,6 +73,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount);
Assert.Equal(0, failedTaskCount);
Assert.Equal(numTasks, runningTaskCount);
+
+ // job fails
+ Assert.True(jobFailure > 0);
CleanUp(testFolder);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/3ea89be9/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 4774eb0..50a75e8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -125,7 +125,10 @@ under the License.
<Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
<Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
<Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+ <Compile Include="Functional\IMRU\TestFailMapperTasksOnDispose.cs" />
+ <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnDispose.cs" />
<Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />
+ <Compile Include="Functional\IMRU\TestFailMapperTasksOnInit.cs" />
<Compile Include="Functional\IMRU\TestFailUpdateEvaluator.cs" />
<Compile Include="Functional\IMRU\TestFailMapperTasks.cs" />
<Compile Include="Functional\IMRU\TestFailMapperEvaluators.cs" />