You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/09 22:55:14 UTC

incubator-reef git commit: [REEF-453] Fix a Race condition in GroupCommunication Client initialization

Repository: incubator-reef
Updated Branches:
  refs/heads/master 4853a2857 -> 4cca54bb8


[REEF-453] Fix a Race condition in GroupCommunication Client initialization

In GroupCommClient constructor, we register the task node to Name Server
first, then instantiate CommunicationGroupClient, which will instantiate
Operators and register handlers. In Operator constructors, we check if
its parents and children are all registered based on the topology. There
is a chance that a parent or a child was registered, but haven't
completed the initialization, resulting a race condition.

This PR moves the registration in GroupCommClient as the last line in
the constructor. And movs the initialize checking out of the Operator
constructors. Initialize will be called when client gets Operator from
CommunicationGroupClient. This would ensure all the registration happens
after constructors. And registration checking happens after all the
constructors.

JIRA:
  [REEF-453](https://issues.apache.org/jira/browse/REEF-453)

Pull Request:
  This closes #278


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4cca54bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4cca54bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4cca54bb

Branch: refs/heads/master
Commit: 4cca54bb82240ef0002bf8c42e7a31e0d8a08c99
Parents: 4853a28
Author: Julia Wang <jw...@yahoo.com>
Authored: Mon Jul 6 16:02:11 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jul 9 13:52:24 2015 -0700

----------------------------------------------------------------------
 .../Config/GroupCommConfigurationOptions.cs     | 33 +++++++++++++-------
 .../Operators/IGroupCommOperatorInternal.cs     | 29 +++++++++++++++++
 .../Group/Operators/Impl/BroadcastReceiver.cs   | 20 ++++++++----
 .../Group/Operators/Impl/BroadcastSender.cs     | 22 ++++++++-----
 .../Group/Operators/Impl/ReduceReceiver.cs      | 20 ++++++++----
 .../Group/Operators/Impl/ReduceSender.cs        | 20 ++++++++----
 .../Group/Operators/Impl/ScatterReceiver.cs     | 20 ++++++++----
 .../Group/Operators/Impl/ScatterSender.cs       | 20 ++++++++----
 .../Task/ICommunicationGroupClientInternal.cs   | 29 +++++++++++++++++
 .../Group/Task/IOperatorTopology.cs             |  7 +++++
 .../Group/Task/Impl/CommunicationGroupClient.cs | 16 +++++++++-
 .../Group/Task/Impl/GroupCommClient.cs          | 16 +++++++---
 .../Group/Task/Impl/OperatorTopology.cs         | 10 ++++--
 .../NetworkService/WritableNetworkService.cs    | 10 +++---
 .../Org.Apache.REEF.Network.csproj              |  2 ++
 15 files changed, 211 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
index 50c0dd6..84c0e85 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
@@ -44,18 +44,27 @@ namespace Org.Apache.REEF.Network.Group.Config
         {
         }
 
-        [NamedParameter("Retry times", defaultValue: "5")]
-        public class RetryCount : Name<int>
-        {
-        }
-
-        [NamedParameter("sleep time to wait for handlers to be registered", defaultValue: "500")]
-        public class SleepTimeWaitingForHandler : Name<int>
-        {
-        }
-
-        [NamedParameter("Retry times to wait for handlers to be registered", defaultValue: "5")]
-        public class RetryCountWaitingForHanler : Name<int>
+        /// <summary>
+        /// Each Communication group needs to check and wait until all the other nodes in the group are registered to the NameServer
+        /// Sleep time is set between each retry. 
+        /// </summary>
+        [NamedParameter("sleep time to wait for nodes to be registered", defaultValue: "500")]
+        internal sealed class SleepTimeWaitingForRegistration : Name<int>
+        {
+        }
+
+        /// <summary>
+        /// Each Communication group needs to check and wait until all the other nodes in the group are registered to the NameServer
+        /// </summary>
+        /// <remarks>
+        /// When there are many nodes, e.g over 100, the waiting time might be pretty long. 
+        /// We don't want to set it too low in case some nodes are just slow, if we simply throw an exception, that is not right. 
+        /// We don't want it to try endlessly in case some node is really dead, we should come out with exception. 
+        /// We want it to return as soon as all nodes in the group are registered, So increasing retry count is better than increasing sleep time.
+        /// Current default sleep time is 500ms. Default retry is 500. Total is 250000ms, that is 250s, little bit more than 4 min
+        /// </remarks>
+        [NamedParameter("Retry times to wait for nodes to be registered", defaultValue: "500")]
+        internal sealed class RetryCountWaitingForRegistration : Name<int>
         {
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
new file mode 100644
index 0000000..d0df9be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    internal interface IGroupCommOperatorInternal
+    {
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void WaitForRegistration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index e7fcb52..f908ec8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -33,11 +33,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to receive broadcast messages in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The type of message being sent.</typeparam>
-    public sealed class BroadcastReceiver<T> : IBroadcastReceiver<T>
+    public sealed class BroadcastReceiver<T> : IBroadcastReceiver<T>, IGroupCommOperatorInternal
     {
         private const int PipelineVersion = 2;
         private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastReceiver<T>));
+        private readonly bool _initialize;
 
         /// <summary>
         /// Creates a new BroadcastReceiver.
@@ -63,14 +64,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = PipelineVersion;
             PipelineDataConverter = dataConverter;
             _topology = topology;
+            _initialize = initialize;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
 
         /// <summary>
@@ -116,5 +113,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
             return PipelineDataConverter.FullMessage(messageList);
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 5457a70..0e69808 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -33,11 +33,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to send messages to child Tasks in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class BroadcastSender<T> : IBroadcastSender<T>
+    public sealed class BroadcastSender<T> : IBroadcastSender<T>, IGroupCommOperatorInternal
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastSender<T>));
         private const int PipelineVersion = 2;
         private readonly IOperatorTopology<PipelineMessage<T>> _topology;
+        private readonly bool _initialize;
 
         /// <summary>
         /// Creates a new BroadcastSender to send messages to other Tasks.
@@ -65,16 +66,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             GroupName = groupName;
             Version = PipelineVersion;
             PipelineDataConverter = dataConverter;
+            _initialize = initialize;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
-      
+
         /// <summary>
         /// Returns the identifier for the Group Communication operator.
         /// </summary>
@@ -113,5 +110,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
                 _topology.SendToChildren(message, MessageType.Data);
             }
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }      
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index 187bd58..be8632a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -33,12 +33,13 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication operator used to receive and reduce messages in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class ReduceReceiver<T> : IReduceReceiver<T>
+    public sealed class ReduceReceiver<T> : IReduceReceiver<T>, IGroupCommOperatorInternal
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceReceiver<T>));
         private const int PipelineVersion = 2;
         private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
+        private readonly bool _initialize;
 
         /// <summary>
         /// Creates a new ReduceReceiver.
@@ -67,17 +68,13 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = PipelineVersion;
             ReduceFunction = reduceFunction;
             PipelineDataConverter = dataConverter;
+            _initialize = initialize;
 
             _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
 
         /// <summary>
@@ -123,5 +120,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
             return PipelineDataConverter.FullMessage(messageList);
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 0c13d3c..bc161f1 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -34,12 +34,13 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication Operator used to send messages to be reduced by the ReduceReceiver in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class ReduceSender<T> : IReduceSender<T>
+    public sealed class ReduceSender<T> : IReduceSender<T>, IGroupCommOperatorInternal
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceSender<T>));
         private const int PipelineVersion = 2;
         private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
+        private readonly bool _initialize;
 
         /// <summary>
         /// Creates a new ReduceSender.
@@ -71,16 +72,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
             _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
+            _initialize = initialize;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             PipelineDataConverter = dataConverter;
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
 
         /// <summary>
@@ -143,5 +140,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
                 }
             }
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index a21656d..af53d07 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -33,10 +33,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// from the IScatterSender.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class ScatterReceiver<T> : IScatterReceiver<T>
+    public sealed class ScatterReceiver<T> : IScatterReceiver<T>, IGroupCommOperatorInternal
     {
         private const int DefaultVersion = 1;
         private readonly IOperatorTopology<T> _topology;
+        private readonly bool _initialize;
 
         /// <summary>
         /// Creates a new ScatterReceiver.
@@ -59,14 +60,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             GroupName = groupName;
             Version = DefaultVersion;
             _topology = topology;
+            _initialize = initialize;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
 
         /// <summary>
@@ -99,5 +96,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _topology.ScatterToChildren(elements, MessageType.Data);
             return elements.ToList();
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 42e7f6b..735d2a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -33,10 +33,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// of the IScatterReceivers.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class ScatterSender<T> : IScatterSender<T>
+    public sealed class ScatterSender<T> : IScatterSender<T>, IGroupCommOperatorInternal
     {
         private const int DefaultVersion = 1;
         private readonly IOperatorTopology<T> _topology;
+        private readonly bool _initialize ;
 
         /// <summary>
         /// Creates a new ScatterSender.
@@ -59,14 +60,10 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             GroupName = groupName;
             Version = DefaultVersion;
             _topology = topology;
+            _initialize = initialize;
 
             var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
-
-            if (initialize)
-            {
-                topology.Initialize();
-            }
         }
 
         public string OperatorName { get; private set; }
@@ -108,5 +105,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         {
             _topology.ScatterToChildren(elements, order, MessageType.Data);
         }
+
+        /// <summary>
+        /// Ensure all parent and children nodes in the topology are registered with teh Name Service.
+        /// </summary>
+        void IGroupCommOperatorInternal.WaitForRegistration()
+        {
+            if (_initialize)
+            {
+                _topology.Initialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
new file mode 100644
index 0000000..e112254
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    internal interface ICommunicationGroupClientInternal : ICommunicationGroupClient
+    {
+        /// <summary>
+        /// Call each Operator to easure all the nodes in the topology group has been registered
+        /// </summary>
+        void WaitingForRegistration();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
index 2e65080..12ec8bd 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
@@ -32,6 +32,13 @@ namespace Org.Apache.REEF.Network.Group.Task
     public interface IOperatorTopology<T>
     {
         /// <summary>
+        /// Initializes operator topology.
+        /// Waits until all Tasks in the CommunicationGroup have registered themselves
+        /// with the Name Service.
+        /// </summary>
+        void Initialize();
+
+        /// <summary>
         /// Sends the message to the parent Task.
         /// </summary>
         /// <param name="message">The message to send</param>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 1155048..305d245 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -19,6 +19,8 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
@@ -34,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// <summary>
     ///  Used by Tasks to fetch Group Communication Operators in the group configured by the driver.
     /// </summary>
-    public class CommunicationGroupClient : ICommunicationGroupClient
+    public class CommunicationGroupClient : ICommunicationGroupClientInternal
     {
         private readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupClient));
         private readonly Dictionary<string, object> _operators;
@@ -175,5 +177,17 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             return (T) op;
         }
+
+        /// <summary>
+        /// Call each Operator to easure all the nodes in the topology group has been registered
+        /// </summary>
+        void ICommunicationGroupClientInternal.WaitingForRegistration()
+        {
+            foreach (var op in _operators.Values)
+            {
+                var method = op.GetType().GetMethod("Org.Apache.REEF.Network.Group.Operators.IGroupCommOperatorInternal.WaitForRegistration", BindingFlags.NonPublic | BindingFlags.Instance);
+                method.Invoke(op, null);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index 8a8e696..e79df55 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     public sealed class GroupCommClient : IGroupCommClient
     {
-        private readonly Dictionary<string, ICommunicationGroupClient> _commGroups;
+        private readonly Dictionary<string, ICommunicationGroupClientInternal> _commGroups;
 
         private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService;
 
@@ -58,17 +58,23 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             AvroConfigurationSerializer configSerializer,
             IInjector injector)
         {
-            _commGroups = new Dictionary<string, ICommunicationGroupClient>();
+            _commGroups = new Dictionary<string, ICommunicationGroupClientInternal>();
             _networkService = networkService;
-            networkService.Register(new StringIdentifier(taskId));
 
             foreach (string serializedGroupConfig in groupConfigs)
             {
                 IConfiguration groupConfig = configSerializer.FromString(serializedGroupConfig);
                 IInjector groupInjector = injector.ForkInjector(groupConfig);
-                ICommunicationGroupClient commGroupClient = groupInjector.GetInstance<ICommunicationGroupClient>();
+                var commGroupClient = (ICommunicationGroupClientInternal)groupInjector.GetInstance<ICommunicationGroupClient>();
                 _commGroups[commGroupClient.GroupName] = commGroupClient;
             }
+
+            networkService.Register(new StringIdentifier(taskId));
+
+            foreach (var group in _commGroups.Values)
+            {
+               group.WaitingForRegistration();
+            }
         }
 
         /// <summary>
@@ -99,4 +105,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _networkService.Dispose();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 6ecf7f3..b1b79d4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -57,6 +57,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private string _driverId;
         private readonly int _timeout;
         private readonly int _retryCount;
+        private readonly int _sleepTime;
 
         private readonly NodeStruct<T> _parent;
         private readonly List<NodeStruct<T>> _children;
@@ -75,7 +76,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="taskId">The operator's Task identifier</param>
         /// <param name="driverId">The identifer for the driver</param>
         /// <param name="timeout">Timeout value for cancellation token</param>
-        /// <param name="retryCount">Number of times to retry registration</param>
+        /// <param name="retryCount">Number of times to retry wating for registration</param>
+        /// <param name="sleepTime">Sleep time between retry wating for registration</param>
         /// <param name="rootId">The identifier for the root Task in the topology graph</param>
         /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
         /// <param name="networkService">The network service</param>
@@ -88,7 +90,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
             [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
             [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout,
-            [Parameter(typeof(GroupCommConfigurationOptions.RetryCount))] int retryCount,
+            [Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount,
+            [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
             WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
@@ -101,6 +104,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _driverId = driverId;
             _timeout = timeout;
             _retryCount = retryCount;
+            _sleepTime = sleepTime;
             _nameClient = networkService.NamingClient;
             _sender = sender;
             _nodesWithData = new BlockingCollection<NodeStruct<T>>();
@@ -529,7 +533,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                     return;
                 }
 
-                Thread.Sleep(500);
+                Thread.Sleep(_sleepTime);
             }
 
             throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
index 7d9d015..93da126 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -69,6 +69,11 @@ namespace Org.Apache.REEF.Network.NetworkService
             _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
             _messageHandler = messageHandler;
 
+            // Create and register incoming message handler
+            // TODO[REEF-419] This should use the TcpPortProvider mechanism
+            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
+
             _nameClient = nameClient;
             _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
 
@@ -122,11 +127,6 @@ namespace Org.Apache.REEF.Network.NetworkService
             _localIdentifier = id;
             NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
 
-            // Create and register incoming message handler
-            // TODO[REEF-419] This should use the TcpPortProvider mechanism
-            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
-            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
-
             Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4cca54bb/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 7b167b4..73b6d9d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -67,6 +67,7 @@ under the License.
     <Compile Include="Group\Operators\IBroadcastReceiver.cs" />
     <Compile Include="Group\Operators\IBroadcastSender.cs" />
     <Compile Include="Group\Operators\IGroupCommOperator.cs" />
+    <Compile Include="Group\Operators\IGroupCommOperatorInternal.cs" />
     <Compile Include="Group\Operators\Impl\BroadcastOperatorSpec.cs" />
     <Compile Include="Group\Operators\Impl\BroadcastReceiver.cs" />
     <Compile Include="Group\Operators\Impl\BroadcastSender.cs" />
@@ -81,6 +82,7 @@ under the License.
     <Compile Include="Group\Operators\Impl\Sender.cs" />
     <Compile Include="Group\Operators\IOperatorSpec.cs" />
     <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
+    <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
     <Compile Include="Group\Task\IOperatorTopology.cs" />
     <Compile Include="Group\Operators\IReduceFunction.cs" />
     <Compile Include="Group\Operators\IReduceReceiver.cs" />