You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2017/08/30 22:54:29 UTC
reef git commit: [REEF-1842] Making IMRU task and input data
association deterministic
Repository: reef
Updated Branches:
refs/heads/master d609c002c -> 4d1b80e10
[REEF-1842] Making IMRU task and input data association deterministic
* Use partitionDescriptor id as part of the context id so that context id is associated with a fixed partitionDescriptor. Please notice that the partition descriptor id always follows the pattern like RandomInputPartition-x and is unique
* Make contexts stored in a sorted collection to ensure tasks with associated context/partitionDescriptor are always add to group in the same sequence.
JIRA: [REEF-1842](https://issues.apache.org/jira/browse/REEF-1842)
This closes #1346
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4d1b80e1
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4d1b80e1
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4d1b80e1
Branch: refs/heads/master
Commit: 4d1b80e107da81510ec15e9ca72569fc6cbbac72
Parents: d609c00
Author: Julia Wang <ju...@apache.org>
Authored: Wed Jul 26 19:04:17 2017 -0700
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Wed Aug 30 15:49:51 2017 -0700
----------------------------------------------------------------------
.../OnREEF/Driver/ActiveContextManager.cs | 4 +-
.../OnREEF/Driver/IMRUDriver.cs | 1 +
.../PartitionDescriptorContextIdBundle.cs | 41 ++++++++
.../ServiceAndContextConfigurationProvider.cs | 101 ++++++++++++-------
.../Org.Apache.REEF.IMRU.csproj | 1 +
5 files changed, 112 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
index 437b76f..d73abf6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
@@ -35,7 +35,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager));
- private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>();
+ private readonly IDictionary<string, IActiveContext> _activeContexts = new SortedDictionary<string, IActiveContext>();
private readonly int _totalExpectedContexts;
private IObserver<IEnumerable<IActiveContext>> _activeContextObserver;
@@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
if (AreAllContextsReceived && _activeContextObserver != null)
{
- _activeContextObserver.OnNext(_activeContexts.Values);
+ _activeContextObserver.OnNext(ActiveContexts);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 308a079..3db1bcd 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -392,6 +392,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
: GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId);
commGroup.AddTask(taskId);
taskIdAndContextMapping.Add(taskId, activeContext);
+ Logger.Log(Level.Info, "Adding {0} with associated context: {1} to communication group: {2}.", taskId, activeContext.Id, IMRUConstants.CommunicationGroupName);
}
foreach (var mapping in taskIdAndContextMapping)
http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
new file mode 100644
index 0000000..bb0b118
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ /// <summary>
+ /// It keeps the mapping between context id and partition descriptor id so that the context with the same id always bundled to the same partition data.
+ /// </summary>
+ internal sealed class PartitionDescriptorContextIdBundle
+ {
+ /// <summary>
+ /// Create an object of PartitionDescriptorContextIdBunddle to maintain the relationship.
+ /// If an evaluator failed and a new context is created, the same context id with the same partition data will be assigned to the new context
+ /// </summary>
+ /// <param name="partitionDescriptorId"></param>
+ /// <param name="contextId"></param>
+ internal PartitionDescriptorContextIdBundle(string partitionDescriptorId, string contextId)
+ {
+ PartitionDescriptorId = partitionDescriptorId;
+ ContextId = contextId;
+ }
+
+ internal string PartitionDescriptorId { get; private set; }
+
+ internal string ContextId { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 0d4e27b..a5c8422 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -40,24 +40,46 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
[NotThreadSafe]
internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
{
- private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
+ private static readonly Logger Logger
+ = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
- private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
- private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
+ /// <summary>
+ /// Mapping between Evaluator id and assigned partition descriptor/context ids
+ /// </summary>
+ private readonly Dictionary<string, PartitionDescriptorContextIdBundle> _partitionContextIdProvider
+ = new Dictionary<string, PartitionDescriptorContextIdBundle>();
+
+ /// <summary>
+ /// Available partition descriptor and context ids stack
+ /// </summary>
+ private readonly Stack<PartitionDescriptorContextIdBundle> _availablePartitionDescriptorContextIds
+ = new Stack<PartitionDescriptorContextIdBundle>();
+
+ /// <summary>
+ /// Input partition data set
+ /// </summary>
private readonly IPartitionedInputDataSet _dataset;
+
+ /// <summary>_configurationManager
+ /// Configuration manager that provides configurations
+ /// </summary>
private readonly ConfigurationManager _configurationManager;
/// <summary>
- /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration
+ /// Constructs the object which maintains partitionDescriptor Ids so that to provide proper data load configuration
+ /// It also maintains the partitionDescriptor id and context id mapping to ensure same context id alway assign the same data partition
+ /// This is to ensure if the tasks are added to the typology based on the sequence of context id, the result is deterministic.
/// </summary>
/// <param name="dataset">partition input dataset</param>
/// <param name="configurationManager">Configuration manager that holds configurations for context and tasks</param>
internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset, ConfigurationManager configurationManager)
{
_dataset = dataset;
+ int contextSequenceNumber = 0;
foreach (var descriptor in _dataset)
{
- _partitionDescriptorIds.Push(descriptor.Id);
+ var contextId = string.Format("DataLoadingContext-{0}", contextSequenceNumber++);
+ _availablePartitionDescriptorContextIds.Push(new PartitionDescriptorContextIdBundle(descriptor.Id, contextId));
}
_configurationManager = configurationManager;
}
@@ -70,13 +92,19 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <returns>Whether failed evaluator is master or not</returns>
internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId)
{
- if (!_partitionIdProvider.ContainsKey(evaluatorId))
+ PartitionDescriptorContextIdBundle partitionDescriptor;
+ if (_partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptor))
+ {
+ _availablePartitionDescriptorContextIds.Push(partitionDescriptor);
+ _partitionContextIdProvider.Remove(evaluatorId);
+ }
+ else
{
- var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
- Exceptions.Throw(new Exception(msg), Logger);
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Partition descriptor for Failed evaluator:{0} not present",
+ evaluatorId);
+ throw new IMRUSystemException(msg);
}
- _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
- _partitionIdProvider.Remove(evaluatorId);
}
/// <summary>
@@ -105,13 +133,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <returns></returns>
internal string GetPartitionIdByEvaluatorId(string evaluatorId)
{
- if (!_partitionIdProvider.ContainsKey(evaluatorId))
+ PartitionDescriptorContextIdBundle partitionDescriptorContextId;
+ _partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptorContextId);
+
+ if (partitionDescriptorContextId == null)
{
- var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
+ throw new IMRUSystemException(msg);
}
-
- return _partitionIdProvider[evaluatorId];
+ return partitionDescriptorContextId.PartitionDescriptorId;
}
/// <summary>
@@ -119,51 +150,53 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// evaluator or new configuration
/// </summary>
/// <param name="evaluatorId"></param>
- /// <returns></returns>
+ /// <returns>Configuration for context and service</returns>
internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
{
- if (_partitionDescriptorIds.Count == 0)
+ try
{
- Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
+ Logger.Log(Level.Info, "Getting a new data loading configuration");
+ _partitionContextIdProvider.Add(evaluatorId, _availablePartitionDescriptorContextIds.Pop());
}
-
- if (_partitionIdProvider.ContainsKey(evaluatorId))
+ catch (InvalidOperationException e)
+ {
+ throw new IMRUSystemException("No more data configuration can be provided", e);
+ }
+ catch (ArgumentException e)
{
var msg =
string.Format(
CultureInfo.InvariantCulture,
"Evaluator Id:{0} already present in configuration cache, they have to be unique",
evaluatorId);
- Exceptions.Throw(new IMRUSystemException(msg), Logger);
+ throw new IMRUSystemException(msg, e);
}
- Logger.Log(Level.Info, "Getting a new data loading configuration");
- _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
-
try
{
+ var partitionIdContextId = _partitionContextIdProvider[evaluatorId];
IPartitionDescriptor partitionDescriptor =
- _dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]);
- return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId);
+ _dataset.GetPartitionDescriptorForId(partitionIdContextId.PartitionDescriptorId);
+ return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, partitionIdContextId.ContextId);
}
catch (Exception e)
{
- var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset",
- _partitionIdProvider[evaluatorId]);
+ var msg = string.Format(CultureInfo.InvariantCulture,
+ "Error while trying to access partition descriptor:{0} from dataset",
+ _partitionContextIdProvider[evaluatorId]);
Exceptions.Throw(e, msg, Logger);
return null;
}
}
/// <summary>
- /// Creates service and data loading context configuration for given evaluator id
+ /// Creates service and data loading context configuration for given context id and partition descriptor
/// </summary>
/// <param name="partitionDescriptor"></param>
- /// <param name="evaluatorId"></param>
- /// <returns></returns>
+ /// <param name="contextId"></param>
+ /// <returns>Configuration for context and service</returns>
private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
- IPartitionDescriptor partitionDescriptor,
- string evaluatorId)
+ IPartitionDescriptor partitionDescriptor, string contextId)
{
var dataLoadingContextConf =
TangFactory.GetTang()
@@ -185,7 +218,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
.Build();
var contextConf = ContextConfiguration.ConfigurationModule
- .Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId))
+ .Set(ContextConfiguration.Identifier, contextId)
.Build();
return new ContextAndServiceConfiguration(contextConf, serviceConf);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index aaf93fb..be74b86 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -85,6 +85,7 @@ under the License.
<Compile Include="OnREEF\Driver\IMRUSystemException.cs" />
<Compile Include="OnREEF\Driver\IMRUConstants.cs" />
<Compile Include="OnREEF\Driver\IMRUDriver.cs" />
+ <Compile Include="OnREEF\Driver\PartitionDescriptorContextIdBundle.cs" />
<Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" />
<Compile Include="OnREEF\Driver\StateMachine\TaskStateMachine.cs" />
<Compile Include="OnREEF\Driver\StateMachine\StateTransition.cs" />