You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/09/02 00:27:11 UTC
[2/3] reef git commit: [REEF-1251] IMRU Driver handlers for fault
tolerant
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 58b75ed..dafba71 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -1,4 +1,4 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -18,14 +18,15 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Globalization;
using System.Linq;
-using System.Threading;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
@@ -33,60 +34,94 @@ using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
- /// Implements the IMRU driver on REEF
+ /// Implements the IMRU driver on REEF with fault tolerant
/// </summary>
/// <typeparam name="TMapInput">Map Input</typeparam>
/// <typeparam name="TMapOutput">Map output</typeparam>
/// <typeparam name="TResult">Result</typeparam>
/// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam>
- internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>
- : IObserver<IDriverStarted>,
+ internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> :
+ IObserver<IDriverStarted>,
IObserver<IAllocatedEvaluator>,
IObserver<IActiveContext>,
IObserver<ICompletedTask>,
IObserver<IFailedEvaluator>,
IObserver<IFailedContext>,
- IObserver<IFailedTask>
+ IObserver<IFailedTask>,
+ IObserver<IRunningTask>,
+ IObserver<IEnumerable<IActiveContext>>
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
private readonly ConfigurationManager _configurationManager;
private readonly int _totalMappers;
- private readonly IEvaluatorRequestor _evaluatorRequestor;
- private ICommunicationGroupDriver _commGroup;
private readonly IGroupCommDriver _groupCommDriver;
- private readonly TaskStarter _groupCommTaskStarter;
- private readonly ConcurrentStack<IConfiguration> _perMapperConfiguration;
- private readonly int _coresPerMapper;
- private readonly int _coresForUpdateTask;
- private readonly int _memoryPerMapper;
- private readonly int _memoryForUpdateTask;
+ private readonly INameServer _nameServer;
+ private ConcurrentStack<IConfiguration> _perMapperConfigurationStack;
private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
- private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>();
- private readonly int _allowedFailedEvaluators;
- private int _currentFailedEvaluators = 0;
private readonly bool _invokeGC;
- private int _numberOfReadyTasks = 0;
+ private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider;
- private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
- _serviceAndContextConfigurationProvider;
+ /// <summary>
+ /// The lock for the driver.
+ /// </summary>
+ private readonly object _lock = new object();
+
+ /// <summary>
+ /// Manages Tasks, maintains task states and responsible for task submission for the driver.
+ /// </summary>
+ private TaskManager _taskManager;
+
+ /// <summary>
+ /// Manages Active Contexts for the driver.
+ /// </summary>
+ private readonly ActiveContextManager _contextManager;
+
+ /// <summary>
+ /// Manages allocated and failed Evaluators for driver.
+ /// </summary>
+ private readonly EvaluatorManager _evaluatorManager;
+
+ /// <summary>
+ /// Defines the max retry number for recoveries. It is configurable for the driver.
+ /// </summary>
+ private readonly int _maxRetryNumberForFaultTolerant;
+
+ /// <summary>
+ /// System State of the driver.
+ /// <see href="https://issues.apache.org/jira/browse/REEF-1223"></see>
+ /// </summary>
+ private SystemStateMachine _systemState;
+
+ /// <summary>
+ /// Shows if the driver is first try. Once the system enters recovery, it is set to false.
+ /// </summary>
+ private bool _isFirstTry = true;
+
+ /// <summary>
+ /// It records the number of retry for the recoveries.
+ /// </summary>
+ private int _numberOfRetries;
+
+ private const int DefaultMaxNumberOfRetryInRecovery = 3;
[Inject]
private IMRUDriver(IPartitionedInputDataSet dataSet,
@@ -98,181 +133,517 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
[Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
[Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
[Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction,
+ [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
[Parameter(typeof(InvokeGC))] bool invokeGC,
- IGroupCommDriver groupCommDriver)
+ IGroupCommDriver groupCommDriver,
+ INameServer nameServer)
{
_configurationManager = configurationManager;
- _evaluatorRequestor = evaluatorRequestor;
_groupCommDriver = groupCommDriver;
- _coresPerMapper = coresPerMapper;
- _coresForUpdateTask = coresForUpdateTask;
- _memoryPerMapper = memoryPerMapper;
- _memoryForUpdateTask = memoryForUpdateTask;
+ _nameServer = nameServer;
_perMapperConfigs = perMapperConfigs;
_totalMappers = dataSet.Count;
-
- _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * dataSet.Count);
_invokeGC = invokeGC;
+ _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery > 0 ? maxRetryNumberInRecovery : DefaultMaxNumberOfRetryInRecovery;
+
+ _contextManager = new ActiveContextManager(_totalMappers + 1);
+ _contextManager.Subscribe(this);
+
+ var updateSpec = new EvaluatorSpecification(memoryForUpdateTask, coresForUpdateTask);
+ var mapperSpec = new EvaluatorSpecification(memoryPerMapper, coresPerMapper);
+ var allowedFailedEvaluators = (int)(failedEvaluatorsFraction * _totalMappers);
+ _evaluatorManager = new EvaluatorManager(_totalMappers + 1, allowedFailedEvaluators, evaluatorRequestor, updateSpec, mapperSpec);
- AddGroupCommunicationOperators();
- _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalMappers + 1);
- _perMapperConfiguration = ConstructPerMapperConfigStack(_totalMappers);
+ _systemState = new SystemStateMachine();
_serviceAndContextConfigurationProvider =
new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet);
var msg =
- string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}",
- _memoryPerMapper,
- _memoryForUpdateTask,
- _coresPerMapper,
- _coresForUpdateTask);
+ string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.",
+ memoryPerMapper,
+ memoryForUpdateTask,
+ coresPerMapper,
+ coresForUpdateTask,
+ _maxRetryNumberForFaultTolerant,
+ allowedFailedEvaluators);
Logger.Log(Level.Info, msg);
}
+ #region IDriverStarted
/// <summary>
- /// Requests for evaluator for update task
+ /// Requests evaluators when driver starts
/// </summary>
/// <param name="value">Event fired when driver started</param>
public void OnNext(IDriverStarted value)
{
- RequestUpdateEvaluator();
//// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver.
+ _evaluatorManager.RequestUpdateEvaluator();
+ _evaluatorManager.RequestMapEvaluators(_totalMappers);
}
+ #endregion IDriverStarted
+ #region IAllocatedEvaluator
/// <summary>
- /// Specifies context and service configuration for evaluator depending
- /// on whether it is for Update function or for map function
+ /// IAllocatedEvaluator handler. It will take the following action based on the system state:
+ /// Case WaitingForEvaluator
+ /// Add Evaluator to the Evaluator Manager
+ /// submit Context and Services
+ /// Case Fail
+ /// Do nothing. This is because the code that sets system Fail has executed FailedAction. It has shut down all the allocated evaluators/contexts.
+ /// If a new IAllocatedEvaluator comes after it, we should not submit anything so that the evaluator is returned.
+ /// Other cases - not expected
/// </summary>
/// <param name="allocatedEvaluator">The allocated evaluator</param>
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
- var configs =
- _serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id);
- allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service);
+ Logger.Log(Level.Info, "AllocatedEvaluator EvaluatorBatchId [{0}], memory [{1}], systemState {2}.", allocatedEvaluator.EvaluatorBatchId, allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState);
+ lock (_lock)
+ {
+ using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator"))
+ {
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.WaitingForEvaluator:
+ _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator);
+ SubmitContextAndService(allocatedEvaluator);
+ break;
+ case SystemState.Fail:
+ Logger.Log(Level.Info,
+ "Receiving IAllocatedEvaluator event, but system is in FAIL state, ignore it.");
+ allocatedEvaluator.Dispose();
+ break;
+ default:
+ UnexpectedState(allocatedEvaluator.Id, "IAllocatedEvaluator");
+ break;
+ }
+ }
+ }
}
/// <summary>
- /// Specifies the Map or Update task to run on the active context
+ /// Gets context and service configuration for evaluator depending
+ /// on whether it is for update/master function or for mapper function.
+ /// Then submits Context and Service with the corresponding configuration
/// </summary>
- /// <param name="activeContext"></param>
- public void OnNext(IActiveContext activeContext)
+ /// <param name="allocatedEvaluator"></param>
+ private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator)
{
- Logger.Log(Level.Verbose, string.Format("Received Active Context {0}", activeContext.Id));
-
- if (_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId))
+ ContextAndServiceConfiguration configs;
+ if (_evaluatorManager.IsEvaluatorForMaster(allocatedEvaluator))
{
- Logger.Log(Level.Verbose, "Submitting master task");
- _commGroup.AddTask(IMRUConstants.UpdateTaskName);
- _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), activeContext);
- RequestMapEvaluators(_totalMappers);
+ configs =
+ _serviceAndContextConfigurationProvider
+ .GetContextConfigurationForMasterEvaluatorById(
+ allocatedEvaluator.Id);
}
else
{
- Logger.Log(Level.Verbose, "Submitting map task");
- _serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId);
- string taskId = GetTaskIdByEvaluatorId(activeContext.EvaluatorId);
- _commGroup.AddTask(taskId);
- _groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), activeContext);
- Interlocked.Increment(ref _numberOfReadyTasks);
- Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready for submission", _numberOfReadyTasks));
+ configs = _serviceAndContextConfigurationProvider
+ .GetDataLoadingConfigurationForEvaluatorById(
+ allocatedEvaluator.Id);
}
+ allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service);
}
+ #endregion IAllocatedEvaluator
+ #region IActiveContext
/// <summary>
- /// Specifies what to do when the task is completed
- /// In this case just disposes off the task
+ /// IActiveContext handler. It will take the following actions based on the system state:
+ /// Case WaitingForEvaluator:
+ /// Adds Active Context to Active Context Manager
+ /// Case Fail:
+ /// Closes the ActiveContext
+ /// Other cases - not expected
/// </summary>
- /// <param name="completedTask">The link to the completed task</param>
- public void OnNext(ICompletedTask completedTask)
+ /// <param name="activeContext"></param>
+ public void OnNext(IActiveContext activeContext)
{
- lock (_completedTasks)
+ Logger.Log(Level.Info, "Received Active Context {0}, systemState {1}.", activeContext.Id, _systemState.CurrentState);
+ lock (_lock)
{
- Logger.Log(Level.Info,
- string.Format("Received completed task message from task Id: {0}", completedTask.Id));
- _completedTasks.Add(completedTask);
-
- if (AreIMRUTasksCompleted())
+ using (Logger.LogFunction("IMRUDriver::IActiveContext"))
{
- ShutDownAllEvaluators();
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.WaitingForEvaluator:
+ _contextManager.Add(activeContext);
+ break;
+ case SystemState.Fail:
+ Logger.Log(Level.Info,
+ "Received IActiveContext event, but system is in FAIL state. Closing the context.");
+ activeContext.Dispose();
+ break;
+ default:
+ UnexpectedState(activeContext.Id, "IActiveContext");
+ break;
+ }
}
}
}
+ #endregion IActiveContext
+ #region submit tasks
/// <summary>
- /// Specifies what to do when evaluator fails.
- /// If we get all completed tasks then ignore the failure
- /// Else request a new evaluator. If failure happens in middle of IMRU
- /// job we expect neighboring evaluators to fail while doing
- /// communication and will use FailedTask and FailedContext logic to
- /// order shutdown.
+ /// Called from ActiveContextManager when all the expected active context are received.
+ /// It changes the system state then calls SubmitTasks().
/// </summary>
/// <param name="value"></param>
- public void OnNext(IFailedEvaluator value)
+ public void OnNext(IEnumerable<IActiveContext> value)
{
- if (AreIMRUTasksCompleted())
+ Logger.Log(Level.Info, "Received event from ActiveContextManager with NumberOfActiveContexts:" + (value != null ? value.Count() : 0));
+ lock (_lock)
{
- Logger.Log(Level.Info,
- string.Format("Evaluator with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
- return;
+ // When the event AllContextsAreReady happens, change the system state from WaitingForEvaluator to SubmittingTasks
+ _systemState.MoveNext(SystemStateEvent.AllContextsAreReady);
+ SubmitTasks(value);
}
+ }
- Logger.Log(Level.Info,
- string.Format("Evaluator with Id: {0} failed with Exception: {1}", value.Id, value.EvaluatorException));
- int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators);
- if (currFailedEvaluators > _allowedFailedEvaluators)
+ /// <summary>
+ /// This method is responsible to prepare for the task submission then call SubmitTasks in TaskManager.
+ /// It is called in both first time and recovery scenarios.
+ /// Creates a new Communication Group and adds Group Communication Operators
+ /// For each context, adds a task to the communication group.
+ /// After all the tasks are added to the group, for each task, gets GroupCommTaskConfiguration from IGroupCommDriver
+ /// and merges it with the task configuration.
+ /// When all the tasks are added, calls TaskManager to SubmitTasks().
+ /// </summary>
+ private void SubmitTasks(IEnumerable<IActiveContext> activeContexts)
+ {
+ Logger.Log(Level.Info, "SubmitTasks with system state : {0} at time: {1}.", _systemState.CurrentState, DateTime.Now);
+ using (Logger.LogFunction("IMRUDriver::SubmitTasksConfiguration"))
{
- Exceptions.Throw(new MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators),
- Logger);
- }
+ if (!_isFirstTry)
+ {
+ _groupCommDriver.RemoveCommunicationGroup(IMRUConstants.CommunicationGroupName);
+ }
- _serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id);
- bool isMaster = _serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id);
+ UpdateMaterTaskId();
+ _taskManager = new TaskManager(_totalMappers + 1, _groupCommDriver.MasterTaskId);
+ var commGroup = AddCommunicationGroupWithOperators();
+ _perMapperConfigurationStack = ConstructPerMapperConfigStack(_totalMappers);
+
+ var taskIdAndContextMapping = new Dictionary<string, IActiveContext>();
+ foreach (var activeContext in activeContexts)
+ {
+ var taskId = _evaluatorManager.IsMasterEvaluatorId(activeContext.EvaluatorId)
+ ? _groupCommDriver.MasterTaskId
+ : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId);
+ commGroup.AddTask(taskId);
+ taskIdAndContextMapping.Add(taskId, activeContext);
+ }
+
+ foreach (var mapping in taskIdAndContextMapping)
+ {
+ var taskConfig = _evaluatorManager.IsMasterEvaluatorId(mapping.Value.EvaluatorId)
+ ? GetMasterTaskConfiguration(mapping.Key)
+ : GetMapperTaskConfiguration(mapping.Value, mapping.Key);
+ var groupCommTaskConfiguration = _groupCommDriver.GetGroupCommTaskConfiguration(mapping.Key);
+ var mergedTaskConf = Configurations.Merge(taskConfig, groupCommTaskConfiguration);
+ _taskManager.AddTask(mapping.Key, mergedTaskConf, mapping.Value);
+ }
+ }
+ _taskManager.SubmitTasks();
+ }
- // If failed evaluator is master then ask for master
- // evaluator else ask for mapper evaluator
- if (!isMaster)
+ private void UpdateMaterTaskId()
+ {
+ if (_isFirstTry)
{
- Logger.Log(Level.Info, string.Format("Requesting a replacement map Evaluator for {0}", value.Id));
- RequestMapEvaluators(1);
+ _groupCommDriver.MasterTaskId = _groupCommDriver.MasterTaskId + "-" + _numberOfRetries;
}
else
{
- Logger.Log(Level.Info, string.Format("Requesting a replacement master Evaluator for {0}", value.Id));
- RequestUpdateEvaluator();
+ _groupCommDriver.MasterTaskId =
+ _groupCommDriver.MasterTaskId.Substring(0, _groupCommDriver.MasterTaskId.Length - 1) +
+ _numberOfRetries;
}
}
+ #endregion submit tasks
+ #region IRunningTask
/// <summary>
- /// Specifies what to do if Failed Context is received.
- /// An exception is thrown if tasks are not completed.
+ /// IRunningTask handler. The method is called when a task is running. The following action will be taken based on the system state:
+ /// Case SubmittingTasks
+ /// Add it to RunningTasks and set task state to TaskRunning
+ /// When all the tasks are running, change system state to TasksRunning
+ /// Case ShuttingDown/Fail
+ /// Call TaskManager to record RunningTask during SystemFailure
+ /// Other cases - not expected
/// </summary>
- /// <param name="value"></param>
- public void OnNext(IFailedContext value)
+ /// <param name="runningTask"></param>
+ public void OnNext(IRunningTask runningTask)
{
- if (AreIMRUTasksCompleted())
+ Logger.Log(Level.Info, "Received IRunningTask {0} from endpoint {1} at SystemState {2} retry # {3}.", runningTask.Id, GetEndPointFromTaskId(runningTask.Id), _systemState.CurrentState, _numberOfRetries);
+ lock (_lock)
{
- Logger.Log(Level.Info,
- string.Format("Context with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
- return;
+ using (Logger.LogFunction("IMRUDriver::IRunningTask"))
+ {
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.SubmittingTasks:
+ _taskManager.RecordRunningTask(runningTask);
+ if (_taskManager.AreAllTasksRunning())
+ {
+ _systemState.MoveNext(SystemStateEvent.AllTasksAreRunning);
+ Logger.Log(Level.Info,
+ "All tasks are running, SystemState {0}",
+ _systemState.CurrentState);
+ }
+ break;
+ case SystemState.ShuttingDown:
+ case SystemState.Fail:
+ _taskManager.RecordRunningTaskDuringSystemFailure(runningTask, TaskManager.CloseTaskByDriver);
+ break;
+ default:
+ UnexpectedState(runningTask.Id, "IRunningTask");
+ break;
+ }
+ }
}
- Exceptions.Throw(new Exception(string.Format("Data Loading Context with Id: {0} failed", value.Id)), Logger);
}
+ #endregion IRunningTask
+ #region ICompletedTask
/// <summary>
- /// Specifies what to do if a task fails.
- /// We throw the exception and fail IMRU unless IMRU job is already done.
+ /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State:
+ /// Case TasksRunning
+ /// Updates task state to TaskCompleted
+ /// If all tasks are completed, sets system state to TasksCompleted and then go to Done action
+ /// Case ShuttingDown
+ /// Updates task state to TaskCompleted
+ /// Try to recover
+ /// Other cases - not expected
/// </summary>
- /// <param name="value"></param>
- public void OnNext(IFailedTask value)
+ /// <param name="completedTask">The link to the completed task</param>
+ public void OnNext(ICompletedTask completedTask)
{
- if (AreIMRUTasksCompleted())
+ Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries);
+ lock (_lock)
{
- Logger.Log(Level.Info,
- string.Format("Task with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id));
- return;
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.TasksRunning:
+ _taskManager.RecordCompletedTask(completedTask);
+ if (_taskManager.AreAllTasksCompleted())
+ {
+ _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
+ Logger.Log(Level.Info, "All tasks are completed, systemState {0}", _systemState.CurrentState);
+ DoneAction();
+ }
+ break;
+ case SystemState.ShuttingDown:
+ // The task might be in running state or waiting for close, record the completed task
+ _taskManager.RecordCompletedTask(completedTask);
+ TryRecovery();
+ break;
+ default:
+ UnexpectedState(completedTask.Id, "ICompletedTask");
+ break;
+ }
+ }
+ }
+ #endregion ICompletedTask
+
+ #region IFailedEvaluator
+ /// <summary>
+ /// IFailedEvaluator handler. It specifies what to do when an evaluator fails.
+ /// If we get all completed tasks then ignore the failure. Otherwise, take the following actions based on the system state:
+ /// Case WaitingForEvaluator
+ /// This happens in the middle of submitting contexts. We just need to remove the failed evaluator
+ /// from EvaluatorManager and remove associated active context, if any, from ActiveContextManager
+ /// then checks if the system is recoverable. If yes, request another Evaluator
+ /// If not recoverable, set system state to Fail then execute Fail action
+ /// Case SubmittingTasks/TasksRunning
+ /// This happens either in the middle of Task submitting or all the tasks are running
+ /// Changes the system state to ShuttingDown
+ /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
+ /// Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure
+ /// Closes all the other running tasks
+ /// Try to recover in case it is the last failure received
+ /// Case ShuttingDown
+ /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
+ /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
+ /// Removes associated task from running task if it was running, changes the task state to ClosedTask if it was waiting for close
+ /// otherwise changes the task state to FailedTaskEvaluatorError
+ /// Try to recover in case it is the last failure received
+ /// Other cases - not expected
+ /// </summary>
+ /// <param name="failedEvaluator"></param>
+ public void OnNext(IFailedEvaluator failedEvaluator)
+ {
+ var endpoint = failedEvaluator.FailedTask.IsPresent()
+ ? GetEndPoint(failedEvaluator.FailedTask.Value)
+ : failedEvaluator.FailedContexts.Any()
+ ? GetEndPointFromContext(failedEvaluator.FailedContexts.First())
+ : "unknown_endpoint";
+
+ Logger.Log(Level.Warning, "Received IFailedEvaluator {0} from endpoint {1} with systemState {2} in retry# {3} with Exception: {4}.", failedEvaluator.Id, endpoint, _systemState.CurrentState, _numberOfRetries, failedEvaluator.EvaluatorException);
+
+ lock (_lock)
+ {
+ using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
+ {
+ if (_taskManager != null && _taskManager.AreAllTasksCompleted())
+ {
+ Logger.Log(Level.Verbose,
+ "All IMRU tasks have been completed. So ignoring the Evaluator {0} failure.",
+ failedEvaluator.Id);
+ return;
+ }
+
+ var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
+ _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
+ _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
+
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.WaitingForEvaluator:
+ if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures())
+ {
+ if (isMaster)
+ {
+ Logger.Log(Level.Info, "Requesting a master Evaluator.");
+ _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
+ _evaluatorManager.RequestUpdateEvaluator();
+ }
+ else
+ {
+ _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+ failedEvaluator.Id);
+ Logger.Log(Level.Info, "Requesting mapper Evaluators.");
+ _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
+ _evaluatorManager.RequestMapEvaluators(1);
+ }
+ }
+ else
+ {
+ Logger.Log(Level.Error, "The system is not recoverable, change the state to Fail.");
+ _systemState.MoveNext(SystemStateEvent.NotRecoverable);
+ FailAction();
+ }
+ break;
+
+ case SystemState.SubmittingTasks:
+ case SystemState.TasksRunning:
+ // When the event FailedNode happens, change the system state to ShuttingDown
+ _systemState.MoveNext(SystemStateEvent.FailedNode);
+ _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+ _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+
+ // Push evaluator id back to PartitionIdProvider if it is not master
+ if (!isMaster)
+ {
+ _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+ failedEvaluator.Id);
+ }
+
+ TryRecovery();
+ break;
+
+ case SystemState.ShuttingDown:
+ _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+
+ // Push evaluator id back to PartitionIdProvider if it is not master
+ if (!isMaster)
+ {
+ _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+ failedEvaluator.Id);
+ }
+ TryRecovery();
+ break;
+
+ case SystemState.Fail:
+ break;
+
+ default:
+ UnexpectedState(failedEvaluator.Id, "IFailedEvaluator");
+ break;
+ }
+ }
+ }
+ }
+ #endregion IFailedEvaluator
+
+ #region IFailedContext
+ /// <summary>
+ /// IFailedContext handler. It specifies what to do if Failed Context is received.
+ /// If we get all completed tasks then ignore the failure otherwise throw exception
+ /// Fault tolerant would be similar to FailedEvaluator.
+ /// </summary>
+ /// <param name="failedContext"></param>
+ public void OnNext(IFailedContext failedContext)
+ {
+ lock (_lock)
+ {
+ if (_taskManager.AreAllTasksCompleted())
+ {
+ Logger.Log(Level.Info, "Context with Id: {0} failed but IMRU tasks are completed. So ignoring.", failedContext.Id);
+ return;
+ }
+
+ var msg = string.Format("Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
+ Exceptions.Throw(new Exception(msg), Logger);
+ }
+ }
+ #endregion IFailedContext
+
+ #region IFailedTask
+ /// <summary>
+ /// IFailedTask handler. It specifies what to do when task fails.
+ /// If we get all completed tasks then ignore the failure. Otherwise take the following actions based on the System state:
+ /// Case SubmittingTasks/TasksRunning
+ /// This is the first failure received
+ /// Changes the system state to ShuttingDown
+ /// Record failed task in TaskManager
+ /// Closes all the other running tasks and set their state to TaskWaitingForClose
+ /// Try to recover
+ /// Case ShuttingDown
+ /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
+ /// Record failed task in TaskManager.
+ /// Try to recover
+ /// Other cases - not expected
+ /// </summary>
+ /// <param name="failedTask"></param>
+ public void OnNext(IFailedTask failedTask)
+ {
+ Logger.Log(Level.Warning, "Received IFailedTask with Id: {0} and message: {1} from endpoint {2} with systemState {3} in retry#: {4}.", failedTask.Id, failedTask.Message, GetEndPointFromContext(failedTask.GetActiveContext()), _systemState.CurrentState, _numberOfRetries);
+ lock (_lock)
+ {
+ using (Logger.LogFunction("IMRUDriver::IFailedTask"))
+ {
+ if (_taskManager.AreAllTasksCompleted())
+ {
+ Logger.Log(Level.Info,
+ "Task with Id: {0} failed but all IMRU tasks are completed. So ignoring.",
+ failedTask.Id);
+ return;
+ }
+
+ switch (_systemState.CurrentState)
+ {
+ case SystemState.SubmittingTasks:
+ case SystemState.TasksRunning:
+ // When the event FailedNode happens, change the system state to ShuttingDown
+ _systemState.MoveNext(SystemStateEvent.FailedNode);
+ _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
+ _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+ TryRecovery();
+ break;
+
+ case SystemState.ShuttingDown:
+ _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask);
+ TryRecovery();
+ break;
+
+ default:
+ UnexpectedState(failedTask.Id, "IFailedTask");
+ break;
+ }
+ }
}
- Exceptions.Throw(new Exception(string.Format("Task with Id: {0} failed", value.Id)), Logger);
}
+ #endregion IFailedTask
public void OnError(Exception error)
{
@@ -282,42 +653,154 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
}
- private bool AreIMRUTasksCompleted()
+ private void UnexpectedState(string id, string eventName)
{
- return _completedTasks.Count >= _totalMappers + 1;
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Received {0} for [{1}], but system status is {2}.",
+ eventName,
+ id,
+ _systemState.CurrentState);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
- private string GetTaskIdByEvaluatorId(string evaluatorId)
+ /// <summary>
+ /// If all the tasks are in final state, if the system is recoverable, start recovery
+ /// else, change the system state to Fail then take Fail action
+ /// </summary>
+ private void TryRecovery()
{
- return string.Format("{0}-{1}-Version0",
+ if (_taskManager.AreAllTasksInFinalState())
+ {
+ if (IsRecoverable())
+ {
+ _isFirstTry = false;
+ RecoveryAction();
+ }
+ else
+ {
+ Logger.Log(Level.Warning, "The system is not recoverable, change the state to Fail.");
+ _systemState.MoveNext(SystemStateEvent.NotRecoverable);
+ FailAction();
+ }
+ }
+ }
+
+ private string GetMapperTaskIdByEvaluatorId(string evaluatorId)
+ {
+ return string.Format("{0}-{1}-{2}",
IMRUConstants.MapTaskPrefix,
- _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId));
+ _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId),
+ _numberOfRetries);
+ }
+
+ /// <summary>
+ /// This method is called when all the tasks are successfully completed.
+ /// </summary>
+ private void DoneAction()
+ {
+ ShutDownAllEvaluators();
+ Logger.Log(Level.Info, "DoneAction done in retry {0}!!!", _numberOfRetries);
+ }
+
+ /// <summary>
+ /// This method is called when there are failures and the system is not recoverable.
+ /// </summary>
+ private void FailAction()
+ {
+ ShutDownAllEvaluators();
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "The system cannot be recovered after {0} retries. NumberofFailedMappers in the last try is {1}.",
+ _numberOfRetries, _evaluatorManager.NumberofFailedMappers());
+ Exceptions.Throw(new ApplicationException(msg), Logger);
}
/// <summary>
- /// Shuts down evaluators once all completed task messages are received
+ /// Shuts down evaluators
/// </summary>
private void ShutDownAllEvaluators()
{
- foreach (var task in _completedTasks)
+ foreach (var context in _contextManager.ActiveContexts)
{
- Logger.Log(Level.Info, string.Format("Disposing task: {0}", task.Id));
- task.ActiveContext.Dispose();
+ Logger.Log(Level.Verbose, "Disposing active context: {0}", context.Id);
+ context.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// This method is called for recovery. It resets Failed Evaluators and changes state to WaitingForEvaluator
+ /// If there is no failed mappers, meaning the recovery is caused by failed tasks, resubmit all the tasks.
+ /// Else, based on the number of failed evaluators, requests missing map evaluators
+ /// </summary>
+ private void RecoveryAction()
+ {
+ lock (_lock)
+ {
+ _numberOfRetries++;
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Start recovery with _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}.",
+ _numberOfRetries,
+ _evaluatorManager.NumberofFailedMappers());
+ Logger.Log(Level.Info, msg);
+
+ _systemState.MoveNext(SystemStateEvent.Recover);
+
+ var mappersToRequest = _evaluatorManager.NumberofFailedMappers();
+ _evaluatorManager.ResetFailedEvaluators();
+
+ if (mappersToRequest == 0)
+ {
+ Logger.Log(Level.Info, "There is no failed Evaluator in this recovery but failed tasks.");
+ if (_contextManager.AreAllContextsReceived)
+ {
+ OnNext(_contextManager.ActiveContexts);
+ }
+ else
+ {
+ Exceptions.Throw(new IMRUSystemException("In recovery, there are no Failed evaluators but not all the contexts are received"), Logger);
+ }
+ }
+ else
+ {
+ Logger.Log(Level.Info, "Requesting {0} map Evaluators.", mappersToRequest);
+ _evaluatorManager.RequestMapEvaluators(mappersToRequest);
+ }
}
}
/// <summary>
- /// Generates map task configuration given the active context.
+ /// Checks if the system is recoverable.
+ /// </summary>
+ /// <returns></returns>
+ private bool IsRecoverable()
+ {
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "IsRecoverable: _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}, NumberOfAppErrors {2}, IsMasterEvaluatorFailed {3} AllowedNumberOfEvaluatorFailures {4}, _maxRetryNumberForFaultTolerant {5}.",
+ _numberOfRetries,
+ _evaluatorManager.NumberofFailedMappers(),
+ _taskManager.NumberOfAppErrors(),
+ _evaluatorManager.IsMasterEvaluatorFailed(),
+ _evaluatorManager.AllowedNumberOfEvaluatorFailures,
+ _maxRetryNumberForFaultTolerant);
+ Logger.Log(Level.Info, msg);
+
+ return !_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
+ && _taskManager.NumberOfAppErrors() == 0
+ && !_evaluatorManager.IsMasterEvaluatorFailed()
+ && _numberOfRetries < _maxRetryNumberForFaultTolerant;
+ }
+
+ /// <summary>
+ /// Generates map task configuration given the active context. S
/// Merge configurations of all the inputs to the MapTaskHost.
/// </summary>
/// <param name="activeContext">Active context to which task needs to be submitted</param>
/// <param name="taskId">Task Id</param>
/// <returns>Map task configuration</returns>
- private IConfiguration GetMapTaskConfiguration(IActiveContext activeContext, string taskId)
+ private IConfiguration GetMapperTaskConfiguration(IActiveContext activeContext, string taskId)
{
IConfiguration mapSpecificConfig;
- if (!_perMapperConfiguration.TryPop(out mapSpecificConfig))
+ if (!_perMapperConfigurationStack.TryPop(out mapSpecificConfig))
{
Exceptions.Throw(
new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}",
@@ -343,13 +826,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// Merge configurations of all the inputs to the UpdateTaskHost.
/// </summary>
/// <returns>Update task configuration</returns>
- private IConfiguration GetUpdateTaskConfiguration()
+ private IConfiguration GetMasterTaskConfiguration(string taskId)
{
var partialTaskConf =
TangFactory.GetTang()
.NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier,
- IMRUConstants.UpdateTaskName)
+ taskId)
.Set(TaskConfiguration.Task,
GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class)
.Set(TaskConfiguration.OnClose,
@@ -384,8 +867,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
- /// Generate the group communicaiton configuration to be added
- /// to the tasks
+ /// Creates the group communication configuration to be added to the tasks
/// </summary>
/// <returns>The group communication configuration</returns>
private IConfiguration GetGroupCommConfiguration()
@@ -406,12 +888,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <summary>
/// Adds broadcast and reduce operators to the default communication group
/// </summary>
- private void AddGroupCommunicationOperators()
+ private ICommunicationGroupDriver AddCommunicationGroupWithOperators()
{
var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration;
var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration;
var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration;
+ // TODO check the specific exception type
try
{
TangFactory.GetTang()
@@ -452,25 +935,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
.Build();
}
- _commGroup =
- _groupCommDriver.DefaultGroup
+ var commGroup =
+ _groupCommDriver.NewCommunicationGroup(IMRUConstants.CommunicationGroupName, _totalMappers + 1)
.AddBroadcast<MapInputWithControlMessage<TMapInput>>(
IMRUConstants.BroadcastOperatorName,
- IMRUConstants.UpdateTaskName,
+ _groupCommDriver.MasterTaskId,
TopologyTypes.Tree,
mapInputPipelineDataConverterConfig)
.AddReduce<TMapOutput>(
IMRUConstants.ReduceOperatorName,
- IMRUConstants.UpdateTaskName,
+ _groupCommDriver.MasterTaskId,
TopologyTypes.Tree,
reduceFunctionConfig,
mapOutputPipelineDataConverterConfig)
.Build();
+
+ return commGroup;
}
/// <summary>
- /// Construct the stack of map configuraion which
- /// is specific to each mapper. If user does not
+ /// Construct the stack of map configuration which is specific to each mapper. If user does not
/// specify any then its empty configuration
/// </summary>
/// <param name="totalMappers">Total mappers</param>
@@ -490,30 +974,47 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
- /// Request map evaluators from resource manager
+ /// look up endpoint for given id
/// </summary>
- /// <param name="numEvaluators">Number of evaluators to request</param>
- private void RequestMapEvaluators(int numEvaluators)
+ /// <param name="taskId">Registered identifier in name server></param>
+ /// <returns></returns>
+ private string GetEndPointFromTaskId(string taskId)
{
- _evaluatorRequestor.Submit(
- _evaluatorRequestor.NewBuilder()
- .SetMegabytes(_memoryPerMapper)
- .SetNumber(numEvaluators)
- .SetCores(_coresPerMapper)
- .Build());
+ List<string> t = new List<string>();
+ t.Add(taskId);
+ var ips = _nameServer.Lookup(t);
+ if (ips.Count > 0)
+ {
+ var ip = ips.FirstOrDefault();
+ if (ip != null)
+ {
+ return ip.Endpoint.ToString();
+ }
+ }
+ return null;
}
- /// <summary>
- /// Request update/master evaluator from resource manager
- /// </summary>
- private void RequestUpdateEvaluator()
- {
- _evaluatorRequestor.Submit(
- _evaluatorRequestor.NewBuilder()
- .SetCores(_coresForUpdateTask)
- .SetMegabytes(_memoryForUpdateTask)
- .SetNumber(1)
- .Build());
+ private string GetEndPoint(IFailedTask failedTask)
+ {
+ return GetEndPointFromTaskId(failedTask.Id) ?? GetEndPointFromContext(failedTask.GetActiveContext());
+ }
+
+ private string GetEndPointFromContext(IFailedContext context)
+ {
+ if (context == null || context.EvaluatorDescriptor == null || context.EvaluatorDescriptor.NodeDescriptor == null)
+ {
+ return null;
+ }
+ return context.EvaluatorDescriptor.NodeDescriptor.HostName;
+ }
+
+ private string GetEndPointFromContext(Optional<IActiveContext> context)
+ {
+ if (!context.IsPresent() || context.Value == null || context.Value.EvaluatorDescriptor == null || context.Value.EvaluatorDescriptor.NodeDescriptor == null)
+ {
+ return null;
+ }
+ return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 36916db..24a2b9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -17,36 +17,38 @@
using System;
using System.Collections.Generic;
+using System.Globalization;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
- /// Class that handles failed evaluators and also provides Service
- /// and Context configuration
+ /// Class that provides Service and Context configuration
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
/// <typeparam name="TMapOutput"></typeparam>
/// <typeparam name="TPartitionType"></typeparam>
+ [NotThreadSafe]
internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
- private readonly ISet<string> _submittedEvaluators = new HashSet<string>();
- private readonly ISet<string> _contextLoadedEvaluators = new HashSet<string>();
- private readonly object _lock = new object();
private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
private readonly IPartitionedInputDataSet _dataset;
- private string _masterEvaluatorId;
+ /// <summary>
+ /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration
+ /// </summary>
+ /// <param name="dataset"></param>
internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
{
_dataset = dataset;
@@ -57,120 +59,34 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
- /// Handles failed evaluator. Moves the id from
- /// submitted evaluator to failed evaluator
+ /// Handles failed evaluator. Push the partitionId back to Partition Descriptor Id stack and
+ /// remove evaluatorId from Partition Id Provider collection
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns>Whether failed evaluator is master or not</returns>
- internal bool RecordEvaluatorFailureById(string evaluatorId)
+ internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId)
{
- lock (_lock)
+ if (!_partitionIdProvider.ContainsKey(evaluatorId))
{
- string msg;
- bool isMaster = IsMasterEvaluatorId(evaluatorId);
-
- if (_contextLoadedEvaluators.Contains(evaluatorId))
- {
- msg =
- string.Format(
- "Failed evaluator:{0} already had context loaded. Cannot handle failure at this stage",
- evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- if (!_submittedEvaluators.Contains(evaluatorId))
- {
- msg = string.Format("Failed evaluator:{0} was never submitted", evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- if (!_partitionIdProvider.ContainsKey(evaluatorId) && !isMaster)
- {
- msg = string.Format("Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- _submittedEvaluators.Remove(evaluatorId);
-
- if (isMaster)
- {
- Logger.Log(Level.Info, "Failed Evaluator is Master");
- _masterEvaluatorId = null;
- return true;
- }
-
- Logger.Log(Level.Info, "Failed Evaluator is a Mapper");
- _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
- _partitionIdProvider.Remove(evaluatorId);
- return false;
+ var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
+ Exceptions.Throw(new Exception(msg), Logger);
}
+ _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
+ _partitionIdProvider.Remove(evaluatorId);
}
/// <summary>
- /// Notifies that active context state has been reached
+ /// Gets Context and Service configuration for Master
/// </summary>
/// <param name="evaluatorId"></param>
- internal void RecordActiveContextPerEvaluatorId(string evaluatorId)
- {
- lock (_lock)
- {
- if (!_submittedEvaluators.Contains(evaluatorId))
- {
- var msg = string.Format("Evaluator:{0} never loaded data but still reached active context stage",
- evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- if (_contextLoadedEvaluators.Contains(evaluatorId))
- {
- var msg = string.Format("Evaluator:{0} already reached the active context stage", evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- _contextLoadedEvaluators.Add(evaluatorId);
- _submittedEvaluators.Remove(evaluatorId);
- }
- }
-
- /// <summary>
- /// Gets next context configuration. Either master or mapper.
- /// </summary>
- /// <param name="evaluatorId">Evaluator Id</param>
- /// <returns>The context and service configuration</returns>
- internal ContextAndServiceConfiguration GetContextConfigurationForEvaluatorById(string evaluatorId)
- {
- lock (_lock)
- {
- if (_submittedEvaluators.Contains(evaluatorId))
- {
- string msg = string.Format("The context is already submitted to evaluator:{0}", evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
- }
-
- if (_masterEvaluatorId == null)
- {
- Logger.Log(Level.Info, "Submitting root context and service for master");
- _masterEvaluatorId = evaluatorId;
- _submittedEvaluators.Add(evaluatorId);
- return new ContextAndServiceConfiguration(
- ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
- IMRUConstants.MasterContextId).Build(),
- TangFactory.GetTang().NewConfigurationBuilder().Build());
- }
-
- Logger.Log(Level.Info, "Submitting root context and service for a map task");
- return GetDataLoadingConfigurationForEvaluatorById(evaluatorId);
- }
- }
-
- /// <summary>
- /// Checks whether evaluator id is that of master
- /// </summary>
- /// <param name="evaluatorId">Id of evaluator</param>
- /// <returns>true if id is that of master</returns>
- internal bool IsMasterEvaluatorId(string evaluatorId)
+ /// <returns></returns>
+ internal ContextAndServiceConfiguration GetContextConfigurationForMasterEvaluatorById(string evaluatorId)
{
- return evaluatorId.Equals(_masterEvaluatorId);
+ Logger.Log(Level.Info, "Getting root context and service configuration for master");
+ return new ContextAndServiceConfiguration(
+ ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
+ IMRUConstants.MasterContextId).Build(),
+ TangFactory.GetTang().NewConfigurationBuilder().Build());
}
/// <summary>
@@ -180,29 +96,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <returns></returns>
internal string GetPartitionIdByEvaluatorId(string evaluatorId)
{
- lock (_lock)
+ if (!_partitionIdProvider.ContainsKey(evaluatorId))
{
- string msg;
- if (!_submittedEvaluators.Contains(evaluatorId) && !_contextLoadedEvaluators.Contains(evaluatorId))
- {
- msg = string.Format("Context for Evaluator:{0} has never been submitted", evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
- }
-
- if (IsMasterEvaluatorId(evaluatorId))
- {
- msg = string.Format("Evaluator:{0} is master and does not get partition", evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
- }
-
- if (!_partitionIdProvider.ContainsKey(evaluatorId))
- {
- msg = string.Format("Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
- }
-
- return _partitionIdProvider[evaluatorId];
+ var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
+
+ return _partitionIdProvider[evaluatorId];
}
/// <summary>
@@ -211,16 +111,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// </summary>
/// <param name="evaluatorId"></param>
/// <returns></returns>
- private ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
+ internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
{
- string msg;
-
- if (_contextLoadedEvaluators.Contains(evaluatorId))
- {
- msg = string.Format("Evaluator:{0} already has the data loaded", evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
- }
-
if (_partitionDescriptorIds.Count == 0)
{
Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
@@ -228,8 +120,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
if (_partitionIdProvider.ContainsKey(evaluatorId))
{
- msg =
+ var msg =
string.Format(
+ CultureInfo.InvariantCulture,
"Evaluator Id:{0} already present in configuration cache, they have to be unique",
evaluatorId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
@@ -237,14 +130,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
Logger.Log(Level.Info, "Getting a new data loading configuration");
_partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
- _submittedEvaluators.Add(evaluatorId);
-
- msg = string.Format(
- "Current status: Submitted Evaluators-{0}, Data Loaded Evaluators-{1}, Unused data partitions-{2}",
- _submittedEvaluators.Count,
- _contextLoadedEvaluators.Count,
- _partitionDescriptorIds.Count);
- Logger.Log(Level.Info, msg);
try
{
@@ -254,14 +139,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
catch (Exception e)
{
- msg = string.Format("Error while trying to access partition descriptor:{0} from dataset",
+ var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset",
_partitionIdProvider[evaluatorId]);
Exceptions.Throw(e, msg, Logger);
return null;
}
}
- private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
+ /// <summary>
+ /// Creates service and data loading context configuration for given evaluator id
+ /// </summary>
+ /// <param name="partitionDescriptor"></param>
+ /// <param name="evaluatorId"></param>
+ /// <returns></returns>
+ private static ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
IPartitionDescriptor partitionDescriptor,
string evaluatorId)
{
@@ -286,4 +177,4 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
return new ContextAndServiceConfiguration(contextConf, serviceConf);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index 3bf6d75..a37fa3b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -1,4 +1,4 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+\ufeff\ufeff// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -15,15 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
+using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
@@ -202,7 +205,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <summary>
/// This method is called when receiving ICompletedTask event during task running or system shutting down.
/// Removes the task from running tasks if it was running
- /// Changes the task state from RunningTask to CompletedTask
+ /// Changes the task state from RunningTask to CompletedTask if the task was running
+ /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
/// </summary>
/// <param name="completedTask"></param>
internal void RecordCompletedTask(ICompletedTask completedTask)
@@ -233,6 +237,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <param name="failedTask"></param>
internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
{
+ Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString());
+
var taskState = GetTaskState(failedTask.Id);
if (taskState == StateMachine.TaskState.TaskWaitingForClose)
{
@@ -260,7 +266,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
if (!_runningTasks.ContainsKey(taskId))
{
- var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId);
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "The task [{0}] doesn't exist in Running Tasks.",
+ taskId);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
_runningTasks.Remove(taskId);
@@ -268,6 +276,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
}
+ else
+ {
+ var taskId = FindTaskAssociatedWithTheEvalutor(failedEvaluator.Id);
+ var taskState = GetTaskState(taskId);
+ if (taskState == StateMachine.TaskState.TaskSubmitted)
+ {
+ UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+ }
+ }
+ }
+
+ private string FindTaskAssociatedWithTheEvalutor(string evaluatorId)
+ {
+ return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault();
}
/// <summary>
@@ -346,13 +368,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
- /// Gets error type based on the exception type in IFailedTask
+ /// Gets error type (encoded as TaskStateEvent) based on the exception type in IFailedTask.
+ /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions
+ /// treat then as application error.
/// </summary>
/// <param name="failedTask"></param>
/// <returns></returns>
private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask)
{
var exception = failedTask.AsError();
+ var innerExceptionType = exception.InnerException != null ? exception.InnerException.GetType().ToString() : "InnerException null";
+ var innerExceptionMsg = exception.InnerException != null ? exception.InnerException.Message : "No InnerException";
+
+
+ if (failedTask.GetActiveContext().IsPresent())
+ {
+ Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}, evaluator id: {4}",
+ failedTask.Id,
+ exception.GetType(),
+ innerExceptionType,
+ innerExceptionMsg,
+ failedTask.GetActiveContext().Value.EvaluatorId);
+ }
+ else
+ {
+ Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}",
+ failedTask.Id,
+ exception.GetType(),
+ innerExceptionType,
+ innerExceptionMsg);
+ }
+
if (exception is IMRUTaskAppException)
{
_numberOfAppErrors++;
@@ -362,10 +408,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
return TaskStateEvent.FailedTaskCommunicationError;
}
- else
+ if (exception is IMRUTaskSystemException)
{
return TaskStateEvent.FailedTaskSystemError;
}
+
+ // special case for communication error during group communication initialization
+ if (exception is TaskClientCodeException)
+ {
+ // try extract cause and check whether it is InjectionException for GroupCommClient
+ if (exception.InnerException != null &&
+ exception.InnerException is InjectionException &&
+ exception.InnerException.Message.Contains("GroupCommClient"))
+ {
+ Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType:FailedTaskCommunicationError with task id {0}", failedTask.Id);
+ return TaskStateEvent.FailedTaskCommunicationError;
+ }
+ }
+
+ Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType for un-hanlded exception with task id {0} and exception type {1}", failedTask.Id, exception.GetType());
+ return TaskStateEvent.FailedTaskSystemError;
}
/// <summary>
@@ -381,9 +443,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// Checks if all the tasks are in final states
/// </summary>
/// <returns></returns>
- internal bool AllInFinalState()
+ internal bool AreAllTasksInFinalState()
{
- return _tasks.All(t => t.Value.TaskState.IsFinalState());
+ var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList();
+ var count = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Count();
+
+ if (notInFinalState.Any())
+ {
+ Logger.Log(Level.Info, "Total tasks that are not in final state: {0}, and first 5 are:\r\n {1}", count, string.Join("\r\n", notInFinalState.Select(ToLog)));
+ }
+ else
+ {
+ Logger.Log(Level.Info, "All the tasks are in final state");
+ }
+
+ return !notInFinalState.Any();
+ }
+
+ private string ToLog(KeyValuePair<string, TaskInfo> t)
+ {
+ try
+ {
+ return string.Format("State={0}, taskId={1}, ContextId={2}, evaluatorId={3}, evaluatorHost={4}",
+ t.Value.TaskState.CurrentState,
+ t.Key,
+ t.Value.ActiveContext.Id,
+ t.Value.ActiveContext.EvaluatorId,
+ t.Value.ActiveContext.EvaluatorDescriptor.NodeDescriptor.HostName);
+ }
+ catch (Exception ex)
+ {
+ return string.Format("Failed to get task string: {0}", ex);
+ }
}
/// <summary>
@@ -415,18 +506,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// </summary>
internal void SubmitTasks()
{
- if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+ using (Logger.LogFunction("TaskManager::SubmitTasks"))
{
- string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
- }
+ if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists())
+ {
+ string msg =
+ string.Format(
+ "Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].",
+ NumberOfTasks,
+ _totalExpectedTasks);
+ Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ }
- foreach (var taskId in _tasks.Keys)
- {
- var taskInfo = GetTaskInfo(taskId);
- taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
- UpdateState(taskId, TaskStateEvent.SubmittedTask);
- }
+ SubmitTask(_masterTaskId);
+
+ foreach (var taskId in _tasks.Keys)
+ {
+ if (taskId.Equals(_masterTaskId))
+ {
+ continue;
+ }
+ SubmitTask(taskId);
+ }
+ }
+ }
+
+ private void SubmitTask(string taskId)
+ {
+ Logger.Log(Level.Info, "SubmitTask with task id: {0}.", taskId);
+ var taskInfo = GetTaskInfo(taskId);
+ taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration);
+ UpdateState(taskId, TaskStateEvent.SubmittedTask);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index bce1e4d..ca2fb85 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -17,7 +17,8 @@
using System;
using System.IO;
-using System.Text;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
using System.Threading;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
@@ -75,13 +76,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
/// <param name="groupCommunicationsClient">Used to setup the communications.</param>
/// <param name="taskCloseCoordinator">Task close Coordinator</param>
/// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param>
+ /// <param name="taskId">task id</param>
[Inject]
private MapTaskHost(
IMapFunction<TMapInput, TMapOutput> mapTask,
IGroupCommClient groupCommunicationsClient,
TaskCloseCoordinator taskCloseCoordinator,
- [Parameter(typeof(InvokeGC))] bool invokeGC)
+ [Parameter(typeof(InvokeGC))] bool invokeGC,
+ [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId)
{
+ Logger.Log(Level.Info, "Entering constructor of MapTaskHost for task id {0}", taskId);
_mapTask = mapTask;
_groupCommunicationsClient = groupCommunicationsClient;
var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
@@ -91,6 +95,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
_invokeGC = invokeGC;
_taskCloseCoordinator = taskCloseCoordinator;
_cancellationSource = new CancellationTokenSource();
+ Logger.Log(Level.Info, "MapTaskHost initialized.");
}
/// <summary>
@@ -100,21 +105,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
/// <returns></returns>
public byte[] Call(byte[] memento)
{
+ Logger.Log(Level.Info, "Entering MapTaskHost Call().");
MapControlMessage controlMessage = MapControlMessage.AnotherRound;
-
- while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop)
+ try
{
- if (_invokeGC)
+ while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop)
{
- Logger.Log(Level.Verbose, "Calling Garbage Collector");
- GC.Collect();
- GC.WaitForPendingFinalizers();
- }
+ if (_invokeGC)
+ {
+ Logger.Log(Level.Verbose, "Calling Garbage Collector");
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ }
- try
- {
using (
- MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive(_cancellationSource))
+ MapInputWithControlMessage<TMapInput> mapInput =
+ _dataAndMessageReceiver.Receive(_cancellationSource))
{
controlMessage = mapInput.ControlMessage;
if (controlMessage != MapControlMessage.Stop)
@@ -123,32 +129,77 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
}
}
}
- catch (OperationCanceledException e)
- {
- Logger.Log(Level.Warning, "Received OperationCanceledException in MapTaskHost with message: {0}.", e.Message);
- break;
- }
- catch (IOException e)
+ }
+ catch (OperationCanceledException e)
+ {
+ Logger.Log(Level.Warning,
+ "Received OperationCanceledException in MapTaskHost with message: {0}. The cancellation token is: {1}.",
+ e.Message,
+ _cancellationSource.IsCancellationRequested);
+ }
+ catch (Exception e)
+ {
+ if (e is IOException || e is TcpClientConnectionException || e is RemotingException ||
+ e is SocketException)
{
- Logger.Log(Level.Error, "Received IOException in MapTaskHost with message: {0}.", e.Message);
+ Logger.Log(Level.Error,
+ "Received Exception {0} in MapTaskHost with message: {1}. The cancellation token is: {2}.",
+ e.GetType(),
+ e.Message,
+ _cancellationSource.IsCancellationRequested);
if (!_cancellationSource.IsCancellationRequested)
{
+ Logger.Log(Level.Error,
+ "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+ _cancellationSource.IsCancellationRequested);
throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
}
- break;
}
- catch (TcpClientConnectionException e)
+ else if (e is AggregateException)
+ {
+ Logger.Log(Level.Error,
+ "Received AggregateException. The cancellation token is: {0}.",
+ _cancellationSource.IsCancellationRequested);
+ if (e.InnerException != null)
+ {
+ Logger.Log(Level.Error,
+ "InnerException {0}, with message {1}.",
+ e.InnerException.GetType(),
+ e.InnerException.Message);
+ }
+ if (!_cancellationSource.IsCancellationRequested)
+ {
+ if (e.InnerException != null && e.InnerException is IOException)
+ {
+ Logger.Log(Level.Error,
+ "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+ _cancellationSource.IsCancellationRequested);
+ throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ else
{
- Logger.Log(Level.Error, "Received TcpClientConnectionException in MapTaskHost with message: {0}.", e.Message);
+ Logger.Log(Level.Error,
+ "MapTask is throwing Exception {0}, message {1} with cancellation token: {2} and StackTrace {3}.",
+ e.GetType(),
+ e.Message,
+ _cancellationSource.IsCancellationRequested,
+ e.StackTrace);
if (!_cancellationSource.IsCancellationRequested)
{
- throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+ throw e;
}
- break;
}
}
-
- _taskCloseCoordinator.SignalTaskStopped();
+ finally
+ {
+ _taskCloseCoordinator.SignalTaskStopped();
+ }
Logger.Log(Level.Info, "MapTaskHost returned with cancellation token:{0}.", _cancellationSource.IsCancellationRequested);
return null;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
index a9014c3..af20809 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
@@ -15,12 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-using System;
using System.Text;
using System.Threading;
using Org.Apache.REEF.Common.Tasks.Events;
-using Org.Apache.REEF.IMRU.OnREEF.Driver;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
@@ -56,6 +53,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
/// <param name="cancellationTokenSource"></param>
internal void HandleEvent(ICloseEvent closeEvent, CancellationTokenSource cancellationTokenSource)
{
+ Logger.Log(Level.Info, "HandleEvent: The task received close event");
cancellationTokenSource.Cancel();
_waitToCloseEvent.Wait();