You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2017/08/30 22:54:29 UTC

reef git commit: [REEF-1842] Making IMRU task and input data association deterministic

Repository: reef
Updated Branches:
  refs/heads/master d609c002c -> 4d1b80e10


[REEF-1842]  Making IMRU task and input data association deterministic

   * Use partitionDescriptor id as part of the context id so that context id is associated with a fixed partitionDescriptor. Please notice that the partition descriptor id always follows the pattern like RandomInputPartition-x and is unique
   * Make contexts stored in a sorted collection to ensure tasks with associated context/partitionDescriptor are always add to group in the same sequence.

JIRA: [REEF-1842](https://issues.apache.org/jira/browse/REEF-1842)

This closes #1346


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4d1b80e1
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4d1b80e1
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4d1b80e1

Branch: refs/heads/master
Commit: 4d1b80e107da81510ec15e9ca72569fc6cbbac72
Parents: d609c00
Author: Julia Wang <ju...@apache.org>
Authored: Wed Jul 26 19:04:17 2017 -0700
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Wed Aug 30 15:49:51 2017 -0700

----------------------------------------------------------------------
 .../OnREEF/Driver/ActiveContextManager.cs       |   4 +-
 .../OnREEF/Driver/IMRUDriver.cs                 |   1 +
 .../PartitionDescriptorContextIdBundle.cs       |  41 ++++++++
 .../ServiceAndContextConfigurationProvider.cs   | 101 ++++++++++++-------
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 5 files changed, 112 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
index 437b76f..d73abf6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
@@ -35,7 +35,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager));
-        private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>();
+        private readonly IDictionary<string, IActiveContext> _activeContexts = new SortedDictionary<string, IActiveContext>();
         private readonly int _totalExpectedContexts;
         private IObserver<IEnumerable<IActiveContext>> _activeContextObserver;
 
@@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             if (AreAllContextsReceived && _activeContextObserver != null)
             {
-                _activeContextObserver.OnNext(_activeContexts.Values);
+                _activeContextObserver.OnNext(ActiveContexts);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 308a079..3db1bcd 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -392,6 +392,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                         : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId);
                     commGroup.AddTask(taskId);
                     taskIdAndContextMapping.Add(taskId, activeContext);
+                    Logger.Log(Level.Info, "Adding {0} with associated context: {1} to communication group: {2}.", taskId, activeContext.Id, IMRUConstants.CommunicationGroupName);
                 }
 
                 foreach (var mapping in taskIdAndContextMapping)

http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
new file mode 100644
index 0000000..bb0b118
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// It keeps the mapping between context id and partition descriptor id so that the context with the same id always bundled to the same partition data.
+    /// </summary>
+    internal sealed class PartitionDescriptorContextIdBundle
+    {
+        /// <summary>
+        /// Create an object of PartitionDescriptorContextIdBunddle to maintain the relationship.
+        /// If an evaluator failed and a new context is created, the same context id with the same partition data will be assigned to the new context
+        /// </summary>
+        /// <param name="partitionDescriptorId"></param>
+        /// <param name="contextId"></param>
+        internal PartitionDescriptorContextIdBundle(string partitionDescriptorId, string contextId)
+        {
+            PartitionDescriptorId = partitionDescriptorId;
+            ContextId = contextId;
+        }
+
+        internal string PartitionDescriptorId { get; private set; }
+
+        internal string ContextId { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 0d4e27b..a5c8422 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -40,24 +40,46 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     [NotThreadSafe]
     internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>
     {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
+        private static readonly Logger Logger 
+            = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>));
 
-        private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>();
-        private readonly Stack<string> _partitionDescriptorIds = new Stack<string>();
+        /// <summary>
+        /// Mapping between Evaluator id and assigned partition descriptor/context ids
+        /// </summary>
+        private readonly Dictionary<string, PartitionDescriptorContextIdBundle> _partitionContextIdProvider 
+            = new Dictionary<string, PartitionDescriptorContextIdBundle>();
+
+        /// <summary>
+        /// Available partition descriptor and context ids stack
+        /// </summary>
+        private readonly Stack<PartitionDescriptorContextIdBundle> _availablePartitionDescriptorContextIds
+            = new Stack<PartitionDescriptorContextIdBundle>();
+
+        /// <summary>
+        /// Input partition data set
+        /// </summary>
         private readonly IPartitionedInputDataSet _dataset;
+
+        /// <summary>_configurationManager
+        /// Configuration manager that provides configurations
+        /// </summary>
         private readonly ConfigurationManager _configurationManager;
 
         /// <summary>
-        /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration 
+        /// Constructs the object which maintains partitionDescriptor Ids so that to provide proper data load configuration
+        /// It also maintains the partitionDescriptor id and context id mapping to ensure same context id alway assign the same data partition
+        /// This is to ensure if the tasks are added to the typology based on the sequence of context id, the result is deterministic. 
         /// </summary>
         /// <param name="dataset">partition input dataset</param>
         /// <param name="configurationManager">Configuration manager that holds configurations for context and tasks</param>
         internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset, ConfigurationManager configurationManager)
         {
             _dataset = dataset;
+            int contextSequenceNumber = 0;
             foreach (var descriptor in _dataset)
             {
-                _partitionDescriptorIds.Push(descriptor.Id);
+                var contextId = string.Format("DataLoadingContext-{0}", contextSequenceNumber++);
+                _availablePartitionDescriptorContextIds.Push(new PartitionDescriptorContextIdBundle(descriptor.Id, contextId));
             }
             _configurationManager = configurationManager;
         }
@@ -70,13 +92,19 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <returns>Whether failed evaluator is master or not</returns>
         internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId)
         {
-            if (!_partitionIdProvider.ContainsKey(evaluatorId))
+            PartitionDescriptorContextIdBundle partitionDescriptor;
+            if (_partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptor))
+            {
+                _availablePartitionDescriptorContextIds.Push(partitionDescriptor);
+                _partitionContextIdProvider.Remove(evaluatorId);
+            }
+            else
             {
-                var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId);
-                Exceptions.Throw(new Exception(msg), Logger);
+                var msg = string.Format(CultureInfo.InvariantCulture,
+                    "Partition descriptor for Failed evaluator:{0} not present",
+                    evaluatorId);
+                throw new IMRUSystemException(msg);
             }
-            _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
-            _partitionIdProvider.Remove(evaluatorId);
         }
 
         /// <summary>
@@ -105,13 +133,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <returns></returns>
         internal string GetPartitionIdByEvaluatorId(string evaluatorId)
         {
-            if (!_partitionIdProvider.ContainsKey(evaluatorId))
+            PartitionDescriptorContextIdBundle partitionDescriptorContextId;
+            _partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptorContextId);
+
+            if (partitionDescriptorContextId == null)
             {
-                var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
-                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+                var msg = string.Format(CultureInfo.InvariantCulture, 
+                    "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId);
+                throw new IMRUSystemException(msg);
             }
-
-            return _partitionIdProvider[evaluatorId];
+            return partitionDescriptorContextId.PartitionDescriptorId;
         }
 
         /// <summary>
@@ -119,51 +150,53 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// evaluator or new configuration
         /// </summary>
         /// <param name="evaluatorId"></param>
-        /// <returns></returns>
+        /// <returns>Configuration for context and service</returns>
         internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
         {
-            if (_partitionDescriptorIds.Count == 0)
+            try
             {
-                Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger);
+                Logger.Log(Level.Info, "Getting a new data loading configuration");
+                _partitionContextIdProvider.Add(evaluatorId, _availablePartitionDescriptorContextIds.Pop());
             }
-
-            if (_partitionIdProvider.ContainsKey(evaluatorId))
+            catch (InvalidOperationException e)
+            {
+                throw new IMRUSystemException("No more data configuration can be provided", e);
+            }
+            catch (ArgumentException e)
             {
                 var msg =
                     string.Format(
                         CultureInfo.InvariantCulture,
                         "Evaluator Id:{0} already present in configuration cache, they have to be unique",
                         evaluatorId);
-                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+                throw new IMRUSystemException(msg, e);
             }
 
-            Logger.Log(Level.Info, "Getting a new data loading configuration");
-            _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
-
             try
             {
+                var partitionIdContextId = _partitionContextIdProvider[evaluatorId];
                 IPartitionDescriptor partitionDescriptor =
-                    _dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]);
-                return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId);
+                    _dataset.GetPartitionDescriptorForId(partitionIdContextId.PartitionDescriptorId);
+                return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, partitionIdContextId.ContextId);
             }
             catch (Exception e)
             {
-                var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset",
-                    _partitionIdProvider[evaluatorId]);
+                var msg = string.Format(CultureInfo.InvariantCulture, 
+                    "Error while trying to access partition descriptor:{0} from dataset",
+                    _partitionContextIdProvider[evaluatorId]);
                 Exceptions.Throw(e, msg, Logger);
                 return null;
             }
         }
 
         /// <summary>
-        /// Creates service and data loading context configuration for given evaluator id
+        /// Creates service and data loading context configuration for given context id and partition descriptor
         /// </summary>
         /// <param name="partitionDescriptor"></param>
-        /// <param name="evaluatorId"></param>
-        /// <returns></returns>
+        /// <param name="contextId"></param>
+        /// <returns>Configuration for context and service</returns>
         private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration(
-            IPartitionDescriptor partitionDescriptor,
-            string evaluatorId)
+            IPartitionDescriptor partitionDescriptor, string contextId)
         {
             var dataLoadingContextConf =
                 TangFactory.GetTang()
@@ -185,7 +218,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     .Build();
 
             var contextConf = ContextConfiguration.ConfigurationModule
-                .Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId))
+                .Set(ContextConfiguration.Identifier, contextId)
                 .Build();
             return new ContextAndServiceConfiguration(contextConf, serviceConf);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index aaf93fb..be74b86 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -85,6 +85,7 @@ under the License.
     <Compile Include="OnREEF\Driver\IMRUSystemException.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
     <Compile Include="OnREEF\Driver\IMRUDriver.cs" />
+    <Compile Include="OnREEF\Driver\PartitionDescriptorContextIdBundle.cs" />
     <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" />
     <Compile Include="OnREEF\Driver\StateMachine\TaskStateMachine.cs" />
     <Compile Include="OnREEF\Driver\StateMachine\StateTransition.cs" />