You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/11 23:58:07 UTC

[2/4] incubator-reef git commit: [REEF-150] Adding group communication to REEF .Net

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
new file mode 100644
index 0000000..ffc5165
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI Operator used to send messages to be reduced by the ReduceReceiver.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class ReduceSender<T> : IReduceSender<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+
+        /// <summary>
+        /// Creates a new ReduceSender.
+        /// </summary>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="groupName">The name of the reduce operator's CommunicationGroup</param>
+        /// <param name="topology">The Task's operator topology graph</param>
+        /// <param name="networkHandler">The handler used to handle incoming messages</param>
+        [Inject]
+        public ReduceSender(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology, 
+            ICommunicationGroupNetworkObserver networkHandler)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        /// <summary>
+        /// Returns the name of the reduce operator.
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the name of the operator's CommunicationGroup.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the operator version.
+        /// </summary>
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Sends the data to the operator's ReduceReceiver to be aggregated.
+        /// </summary>
+        /// <param name="data">The data to send</param>
+        public void Send(T data)
+        {
+            if (data == null)
+            {
+                throw new ArgumentNullException("data");    
+            }
+
+            _topology.SendToParent(data, MessageType.Data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
new file mode 100644
index 0000000..8219eb6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// The specification used to define Scatter MPI Operators.
+    /// </summary>
+    public class ScatterOperatorSpec<T> : IOperatorSpec<T>
+    {
+        /// <summary>
+        /// Creates a new ScatterOperatorSpec.
+        /// </summary>
+        /// <param name="senderId">The identifier of the task that will
+        /// be sending messages</param>
+        /// <param name="codec">The codec used to serialize and 
+        /// deserialize messages</param>
+        public ScatterOperatorSpec(string senderId, ICodec<T> codec)
+        {
+            SenderId = senderId;
+            Codec = codec;
+        }
+
+        /// <summary>
+        /// Returns the identifier for the task that splits and scatters a list
+        /// of messages to other tasks.
+        /// </summary>
+        public string SenderId { get; private set; }
+
+        /// <summary>
+        /// The codec used to serialize and deserialize messages.
+        /// </summary>
+        public ICodec<T> Codec { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
new file mode 100644
index 0000000..85b5c13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI operator used to receive a sublist of messages sent
+    /// from the IScatterSender.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class ScatterReceiver<T> : IScatterReceiver<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+
+        /// <summary>
+        /// Creates a new ScatterReceiver.
+        /// </summary>
+        /// <param name="operatorName">The name of the scatter operator</param>
+        /// <param name="groupName">The name of the operator's CommunicationGroup</param>
+        /// <param name="topology">The task's operator topology graph</param>
+        /// <param name="networkHandler">Handles incoming messages from other tasks</param>
+        [Inject]
+        public ScatterReceiver(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology, 
+            ICommunicationGroupNetworkObserver networkHandler)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        /// <summary>
+        /// Returns the name of the reduce operator
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the name of the operator's CommunicationGroup.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the operator version.
+        /// </summary>
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+        /// </summary>
+        public IReduceFunction<T> ReduceFunction { get; private set; }
+
+        /// <summary>
+        /// Receive a sublist of messages sent from the IScatterSender.
+        /// </summary>
+        /// <returns>The sublist of messages</returns>
+        public List<T> Receive()
+        {
+            return _topology.ReceiveListFromParent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
new file mode 100644
index 0000000..ee9e683
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI operator used to scatter a list of elements to all
+    /// of the IScatterReceivers.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class ScatterSender<T> : IScatterSender<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+
+        /// <summary>
+        /// Creates a new ScatterSender.
+        /// </summary>
+        /// <param name="operatorName">The name of the scatter operator</param>
+        /// <param name="groupName">The name of the operator's Communication Group</param>
+        /// <param name="topology">The operator topology</param>
+        /// <param name="networkHandler">The network handler</param>
+        [Inject]
+        public ScatterSender(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology,
+            ICommunicationGroupNetworkObserver networkHandler)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        public string OperatorName { get; private set; }
+
+        public string GroupName { get; private set; }
+
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Split up the list of elements evenly and scatter each chunk
+        /// to the IScatterReceivers.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        public void Send(List<T> elements)
+        {
+            _topology.ScatterToChildren(elements, MessageType.Data);
+        }
+
+        /// <summary>
+        /// Split up the list of elements and scatter each chunk
+        /// to the IScatterReceivers.  Each receiver will receive
+        /// a sublist of the specified size.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        /// <param name="count">The size of each sublist</param>
+        public void Send(List<T> elements, int count)
+        {
+            _topology.ScatterToChildren(elements, count, MessageType.Data);
+        }
+
+        /// <summary>
+        /// Split up the list of elements and scatter each chunk
+        /// to the IScatterReceivers in the specified task order.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        /// <param name="order">The list of task identifiers representing
+        /// the order in which to scatter each sublist</param>
+        public void Send(List<T> elements, List<string> order)
+        {
+            _topology.ScatterToChildren(elements, order, MessageType.Data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
new file mode 100644
index 0000000..c5ca60f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI operator used to do point-to-point communication between named Tasks.
+    /// </summary>
+    public class Sender
+    {
+        private INetworkService<GroupCommunicationMessage> _networkService;
+        private IIdentifierFactory _idFactory;
+
+        /// <summary>
+        /// Creates a new Sender.
+        /// </summary>
+        /// <param name="networkService">The network services used to send messages.</param>
+        /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param>
+        [Inject]
+        public Sender(
+            NetworkService<GroupCommunicationMessage> networkService, 
+            IIdentifierFactory idFactory)
+        {
+            _networkService = networkService;
+            _idFactory = idFactory;
+        }
+
+        /// <summary>
+        /// Send the GroupCommunicationMessage to the Task whose name is
+        /// included in the message.
+        /// </summary>
+        /// <param name="message">The message to send.</param>
+        public void Send(GroupCommunicationMessage message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+            if (string.IsNullOrEmpty(message.Destination))
+            {
+                throw new ArgumentException("Message destination cannot be null or empty");
+            }
+
+            IIdentifier destId = _idFactory.Create(message.Destination);
+            var conn = _networkService.NewConnection(destId);
+            conn.Open();
+            conn.Write(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
new file mode 100644
index 0000000..9b96a9a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    /// <summary>
+    ///  Used by Tasks to fetch MPI Operators in the group configured by the driver.
+    /// </summary>
+    [DefaultImplementation(typeof(CommunicationGroupClient))]
+    public interface ICommunicationGroupClient
+    {
+        /// <summary>
+        /// Returns the Communication Group name
+        /// </summary>
+        string GroupName { get; }
+
+        /// <summary>
+        /// Gets the BroadcastSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Broadcast operator</param>
+        /// <returns>The BroadcastSender</returns>
+        IBroadcastSender<T> GetBroadcastSender<T>(string operatorName);
+
+        /// <summary>
+        /// Gets the BroadcastReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Broadcast operator</param>
+        /// <returns>The BroadcastReceiver</returns>
+        IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName);
+
+        /// <summary>
+        /// Gets the ReduceSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Reduce operator</param>
+        /// <returns>The ReduceSender</returns>
+        IReduceSender<T> GetReduceSender<T>(string operatorName);
+
+        /// <summary>
+        /// Gets the ReduceReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Reduce operator</param>
+        /// <returns>The ReduceReceiver</returns>
+        IReduceReceiver<T> GetReduceReceiver<T>(string operatorName);
+
+        /// <summary>
+        /// Gets the ScatterSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Scatter operator</param>
+        /// <returns>The ScatterSender</returns>
+        IScatterSender<T> GetScatterSender<T>(string operatorName);
+
+        /// <summary>
+        /// Gets the ScatterReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Scatter operator</param>
+        /// <returns>The ScatterReceiver</returns>
+        IScatterReceiver<T> GetScatterReceiver<T>(string operatorName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
new file mode 100644
index 0000000..d3034fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    /// <summary>
+    /// Handles incoming messages sent to this Communication Group.
+    /// </summary>
+    [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
+    public interface ICommunicationGroupNetworkObserver : IObserver<GroupCommunicationMessage>
+    {
+        /// <summary>
+        /// Registers the handler with the CommunicationGroupNetworkObserver.
+        /// Messages that are to be sent to the operator specified by operatorName
+        /// are handled by the given observer.
+        /// </summary>
+        /// <param name="operatorName">The name of the operator whose handler
+        /// will be invoked</param>
+        /// <param name="observer">The handler to invoke when messages are sent
+        /// to the operator specified by operatorName</param>
+        void Register(string operatorName, IObserver<GroupCommunicationMessage> observer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
new file mode 100644
index 0000000..2592b04
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    /// <summary>
+    /// Used by Tasks to fetch CommunicationGroupClients.
+    /// </summary>
+    [DefaultImplementation(typeof(MpiClient))]
+    public interface IMpiClient : IDisposable
+    {
+        /// <summary>
+        /// Gets the CommunicationGroupClient with the given group name.
+        /// </summary>
+        /// <param name="groupName">The name of the CommunicationGroupClient</param>
+        /// <returns>The configured CommunicationGroupClient</returns>
+        ICommunicationGroupClient GetCommunicationGroup(string groupName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
new file mode 100644
index 0000000..5fe948c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Codec;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    /// <summary>
+    /// Handles all incoming messages for this Task.
+    /// </summary>
+    [DefaultImplementation(typeof(MpiNetworkObserver))]
+    public interface IMpiNetworkObserver : IObserver<NsMessage<GroupCommunicationMessage>>
+    {
+        /// <summary>
+        /// Registers the network handler for the given CommunicationGroup.
+        /// When messages are sent to the specified group name, the given handler
+        /// will be invoked with that message.
+        /// </summary>
+        /// <param name="groupName">The group name for the network handler</param>
+        /// <param name="commGroupHandler">The network handler to invoke when
+        /// messages are sent to the given group.</param>
+        void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
new file mode 100644
index 0000000..e6d653d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    ///  Used by Tasks to fetch MPI Operators in the group configured by the driver.
+    /// </summary>
+    public class CommunicationGroupClient : ICommunicationGroupClient
+    {
+        private readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupClient));
+
+        private string _taskId;
+        private string _driverId;
+
+        private Dictionary<string, IInjector> _operatorInjectors; 
+        private Dictionary<string, object> _operators;
+        private NetworkService<GroupCommunicationMessage> _networkService; 
+        private IMpiNetworkObserver _mpiNetworkHandler;
+        private ICommunicationGroupNetworkObserver _commGroupNetworkHandler;
+
+        /// <summary>
+        /// Creates a new CommunicationGroupClient.
+        /// </summary>
+        /// <param name="taskId">The identifier for this Task.</param>
+        /// <param name="groupName">The name of the CommunicationGroup</param>
+        /// <param name="driverId">The identifier for the driver</param>
+        /// <param name="operatorConfigs">The serialized operator configurations</param>
+        /// <param name="mpiNetworkObserver">The handler for all incoming messages
+        /// across all Communication Groups</param>
+        /// <param name="networkService">The network service used to send messages.</param>
+        /// <param name="configSerializer">Used to deserialize operator configuration.</param>
+        [Inject]
+        public CommunicationGroupClient(
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+            [Parameter(typeof(MpiConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs,
+            IMpiNetworkObserver mpiNetworkObserver,
+            NetworkService<GroupCommunicationMessage> networkService,
+            AvroConfigurationSerializer configSerializer)
+        {
+            _taskId = taskId;
+            _driverId = driverId;
+            GroupName = groupName;
+
+            _operators = new Dictionary<string, object>();
+            _operatorInjectors = new Dictionary<string, IInjector>();
+
+            _networkService = networkService;
+            _mpiNetworkHandler = mpiNetworkObserver;
+            _commGroupNetworkHandler = new CommunicationGroupNetworkObserver();
+            _mpiNetworkHandler.Register(groupName, _commGroupNetworkHandler);
+
+            // Deserialize operator configuration and store each injector.
+            // When user requests the MPI Operator, use type information to
+            // create the instance.
+            foreach (string operatorConfigStr in operatorConfigs)
+            {
+                IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr);
+
+                IInjector injector = TangFactory.GetTang().NewInjector(operatorConfig);
+                string operatorName = injector.GetNamedInstance<MpiConfigurationOptions.OperatorName, string>(
+                    GenericType<MpiConfigurationOptions.OperatorName>.Class);
+                _operatorInjectors[operatorName] = injector;
+            }
+        }
+
+        /// <summary>
+        /// Returns the Communication Group name
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Gets the BroadcastSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Broadcast operator</param>
+        /// <returns>The BroadcastSender</returns>
+        public IBroadcastSender<T> GetBroadcastSender<T>(string operatorName)
+        {
+            return GetOperatorInstance<BroadcastSender<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the BroadcastReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Broadcast operator</param>
+        /// <returns>The BroadcastReceiver</returns>
+        public IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName)
+        {
+            return GetOperatorInstance<BroadcastReceiver<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the ReduceSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Reduce operator</param>
+        /// <returns>The ReduceSender</returns>
+        public IReduceSender<T> GetReduceSender<T>(string operatorName)
+        {
+            return GetOperatorInstance<ReduceSender<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the ReduceReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Reduce operator</param>
+        /// <returns>The ReduceReceiver</returns>
+        public IReduceReceiver<T> GetReduceReceiver<T>(string operatorName)
+        {
+            return GetOperatorInstance<ReduceReceiver<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the ScatterSender with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Scatter operator</param>
+        /// <returns>The ScatterSender</returns>
+        public IScatterSender<T> GetScatterSender<T>(string operatorName)
+        {
+            return GetOperatorInstance<ScatterSender<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the ScatterReceiver with the given name and message type.
+        /// </summary>
+        /// <typeparam name="T">The message type</typeparam>
+        /// <param name="operatorName">The name of the Scatter operator</param>
+        /// <returns>The ScatterReceiver</returns>
+        public IScatterReceiver<T> GetScatterReceiver<T>(string operatorName)
+        {
+            return GetOperatorInstance<ScatterReceiver<T>>(operatorName);
+        }
+
+        /// <summary>
+        /// Gets the MPI operator with the specified name and type.
+        /// If the operator hasn't been instanciated yet, find the injector 
+        /// associated with the given operator name and use the type information 
+        /// to create a new operator of that type.
+        /// </summary>
+        /// <typeparam name="T">The type of operator to create</typeparam>
+        /// <param name="operatorName">The name of the operator</param>
+        /// <returns>The newly created MPI Operator</returns>
+        private T GetOperatorInstance<T>(string operatorName) where T : class
+        {
+            if (string.IsNullOrEmpty(operatorName))
+            {
+                throw new ArgumentNullException("operatorName");
+            }
+            if (!_operatorInjectors.ContainsKey(operatorName))
+            {
+                throw new ArgumentException("Invalid operator name, cannot create CommunicationGroupClient");
+            }
+
+            object op;
+            if (!_operators.TryGetValue(operatorName, out op))
+            {
+                IInjector injector = _operatorInjectors[operatorName];
+
+                injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, _taskId);
+                injector.BindVolatileParameter(GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class, GroupName);
+                injector.BindVolatileInstance(GenericType<ICommunicationGroupNetworkObserver>.Class, _commGroupNetworkHandler);
+                injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, _networkService);
+                injector.BindVolatileInstance(GenericType<ICommunicationGroupClient>.Class, this);
+
+                try
+                {
+                    op = injector.GetInstance<T>();
+                    _operators[operatorName] = op;
+                }
+                catch (InjectionException)
+                {
+                    LOGGER.Log(Level.Error, "Cannot inject MPI operator: No known operator of type: {0}", typeof(T));
+                    throw;
+                }
+            }
+
+            return (T) op;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
new file mode 100644
index 0000000..97ab082
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// Handles incoming messages sent to this Communication Group.
+    /// </summary>
+    public class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
+    {
+        private Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers;
+            
+        /// <summary>
+        /// Creates a new CommunicationGroupNetworkObserver.
+        /// </summary>
+        [Inject]
+        public CommunicationGroupNetworkObserver()
+        {
+            _handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>();
+        }
+
+        /// <summary>
+        /// Registers the handler with the CommunicationGroupNetworkObserver.
+        /// Messages that are to be sent to the operator specified by operatorName
+        /// are handled by the given observer.
+        /// </summary>
+        /// <param name="operatorName">The name of the operator whose handler
+        /// will be invoked</param>
+        /// <param name="observer">The handler to invoke when messages are sent
+        /// to the operator specified by operatorName</param>
+        public void Register(string operatorName, IObserver<GroupCommunicationMessage> observer)
+        {
+            if (string.IsNullOrEmpty(operatorName))
+            {
+                throw new ArgumentNullException("operatorName");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            _handlers[operatorName] = observer;
+        }
+
+        /// <summary>
+        /// Handles the incoming GroupCommunicationMessage sent to this Communication Group.
+        /// Looks for the operator that the message is being sent to and invoke its handler.
+        /// </summary>
+        /// <param name="message">The incoming message</param>
+        public void OnNext(GroupCommunicationMessage message)
+        {
+            string operatorName = message.OperatorName;
+
+            IObserver<GroupCommunicationMessage> handler;
+            if (!_handlers.TryGetValue(operatorName, out handler))
+            {
+                throw new ArgumentException("No handler registered with the operator name: " + operatorName);
+            }
+
+            handler.OnNext(message);
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
new file mode 100644
index 0000000..7cec022
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// Used by Tasks to fetch CommunicationGroupClients.
+    /// </summary>
+    public class MpiClient : IMpiClient
+    {
+        private Dictionary<string, ICommunicationGroupClient> _commGroups;
+
+        private INetworkService<GroupCommunicationMessage> _networkService;
+
+        /// <summary>
+        /// Creates a new MpiClient and registers the task ID with the Name Server.
+        /// </summary>
+        /// <param name="groupConfigs">The set of serialized Group Communication configurations</param>
+        /// <param name="taskId">The identifier for this task</param>
+        /// <param name="mpiNetworkObserver">The network handler to receive incoming messages
+        /// for this task</param>
+        /// <param name="networkService">The network service used to send messages</param>
+        /// <param name="configSerializer">Used to deserialize Group Communication configuration</param>
+        [Inject]
+        public MpiClient(
+            [Parameter(typeof(MpiConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+            IMpiNetworkObserver mpiNetworkObserver,
+            NetworkService<GroupCommunicationMessage> networkService,
+            AvroConfigurationSerializer configSerializer)
+        {
+            _commGroups = new Dictionary<string, ICommunicationGroupClient>();
+            _networkService = networkService;
+            networkService.Register(new StringIdentifier(taskId));
+
+            foreach (string serializedGroupConfig in groupConfigs)
+            {
+                IConfiguration groupConfig = configSerializer.FromString(serializedGroupConfig);
+
+                IInjector injector = TangFactory.GetTang().NewInjector(groupConfig);
+                injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, taskId);
+                injector.BindVolatileInstance(GenericType<IMpiNetworkObserver>.Class, mpiNetworkObserver);
+                injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, networkService);
+
+                ICommunicationGroupClient commGroup = injector.GetInstance<ICommunicationGroupClient>();
+                _commGroups[commGroup.GroupName] = commGroup;
+            }
+        }
+
+        /// <summary>
+        /// Gets the CommunicationGroupClient for the given group name.
+        /// </summary>
+        /// <param name="groupName">The name of the CommunicationGroupClient</param>
+        /// <returns>The CommunicationGroupClient</returns>
+        public ICommunicationGroupClient GetCommunicationGroup(string groupName)
+        {
+            if (string.IsNullOrEmpty(groupName))
+            {
+                throw new ArgumentNullException("groupName");
+            }
+            if (!_commGroups.ContainsKey(groupName))
+            {
+                throw new ArgumentException("No CommunicationGroupClient with name: " + groupName);
+            }
+
+            return _commGroups[groupName];
+        }
+
+        /// <summary>
+        /// Disposes of the MpiClient's services.
+        /// </summary>
+        public void Dispose()
+        {
+            _networkService.Unregister();
+            _networkService.Dispose();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
new file mode 100644
index 0000000..baa2e5e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// Handles all incoming messages for this Task.
+    /// </summary>
+    public class MpiNetworkObserver : IMpiNetworkObserver
+    {
+        private static Logger LOGGER = Logger.GetLogger(typeof(MpiNetworkObserver));
+
+        private Dictionary<string, IObserver<GroupCommunicationMessage>> _commGroupHandlers;
+            
+        /// <summary>
+        /// Creates a new MpiNetworkObserver.
+        /// </summary>
+        [Inject]
+        public MpiNetworkObserver()
+        {
+            _commGroupHandlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>();
+        }
+
+        /// <summary>
+        /// Handles the incoming NsMessage for this Task.
+        /// Delegates the GroupCommunicationMessage to the correct 
+        /// CommunicationGroupNetworkObserver.
+        /// </summary>
+        /// <param name="nsMessage"></param>
+        public void OnNext(NsMessage<GroupCommunicationMessage> nsMessage)
+        {
+            if (nsMessage == null)
+            {
+                throw new ArgumentNullException("nsMessage");
+            }
+
+            try
+            {
+                GroupCommunicationMessage gcm = nsMessage.Data.First();
+                _commGroupHandlers[gcm.GroupName].OnNext(gcm);
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Error, "Mpi Network Handler received message with no data");
+                throw;
+            }
+            catch (KeyNotFoundException)
+            {
+                LOGGER.Log(Level.Error, "Mpi Network Handler received message for nonexistant group");
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Registers the network handler for the given CommunicationGroup.
+        /// When messages are sent to the specified group name, the given handler
+        /// will be invoked with that message.
+        /// </summary>
+        /// <param name="groupName">The group name for the network handler</param>
+        /// <param name="commGroupHandler">The network handler to invoke when
+        /// messages are sent to the given group.</param>
+        public void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler)
+        {
+            if (string.IsNullOrEmpty(groupName))
+            {
+                throw new ArgumentNullException("groupName");
+            }
+            if (commGroupHandler == null)
+            {
+                throw new ArgumentNullException("commGroupHandler");
+            }
+
+            _commGroupHandlers[groupName] = commGroupHandler;
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
new file mode 100644
index 0000000..f4c7a60
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using System.Collections.Concurrent;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// Stores all incoming messages sent by a particular Task.
+    /// </summary>
+    internal class NodeStruct
+    {
+        private BlockingCollection<GroupCommunicationMessage> _messageQueue;
+
+        /// <summary>
+        /// Creates a new NodeStruct.
+        /// </summary>
+        /// <param name="id">The Task identifier</param>
+        public NodeStruct(string id)
+        {
+            Identifier = id;
+            _messageQueue = new BlockingCollection<GroupCommunicationMessage>();
+        }
+
+        /// <summary>
+        /// Returns the identifier for the Task that sent all
+        /// messages in the message queue.
+        /// </summary>
+        public string Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the first message in the message queue.
+        /// </summary>
+        /// <returns>The first available message.</returns>
+        public byte[][] GetData()
+        {
+            return _messageQueue.Take().Data;
+        }
+
+        /// <summary>
+        /// Adds an incoming message to the message queue.
+        /// </summary>
+        /// <param name="gcm">The incoming message</param>
+        public void AddData(GroupCommunicationMessage gcm)
+        {
+            _messageQueue.Add(gcm);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
new file mode 100644
index 0000000..8752203
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// Contains the Operator's topology graph.
+    /// Used to send or receive messages to/from operators in the same
+    /// Communication Group.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class OperatorTopology<T> : IObserver<GroupCommunicationMessage>
+    {
+        private const int DefaultTimeout = 10000;
+        private const int RetryCount = 5;
+
+        private static Logger LOGGER = Logger.GetLogger(typeof(OperatorTopology<>));
+
+        private string _groupName;
+        private string _operatorName;
+        private string _selfId;
+        private string _driverId;
+
+        private NodeStruct _parent;
+        private List<NodeStruct> _children;
+        private Dictionary<string, NodeStruct> _idToNodeMap;
+        private ICodec<T> _codec;
+        private INameClient _nameClient;
+        private Sender _sender;
+        private BlockingCollection<NodeStruct> _nodesWithData;
+            
+        /// <summary>
+        /// Creates a new OperatorTopology object.
+        /// </summary>
+        /// <param name="operatorName">The name of the MPI Operator</param>
+        /// <param name="groupName">The name of the operator's Communication Group</param>
+        /// <param name="taskId">The operator's Task identifier</param>
+        /// <param name="driverId">The identifer for the driver</param>
+        /// <param name="rootId">The identifier for the root Task in the topology graph</param>
+        /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
+        /// <param name="networkService">The network service</param>
+        /// <param name="codec">The codec used to serialize and deserialize messages</param>
+        /// <param name="sender">The Sender used to do point to point communication</param>
+        [Inject]
+        public OperatorTopology(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+            [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+            [Parameter(typeof(MpiConfigurationOptions.TopologyRootTaskId))] string rootId,
+            [Parameter(typeof(MpiConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
+            NetworkService<GroupCommunicationMessage> networkService,
+            ICodec<T> codec,
+            Sender sender)
+        {
+            _operatorName = operatorName;
+            _groupName = groupName;
+            _selfId = taskId;
+            _driverId = driverId;
+            _codec = codec;
+            _nameClient = networkService.NamingClient;
+            _sender = sender;
+            _nodesWithData = new BlockingCollection<NodeStruct>();
+            _children = new List<NodeStruct>();
+            _idToNodeMap = new Dictionary<string, NodeStruct>();
+
+            if (_selfId.Equals(rootId))
+            {
+                _parent = null;
+                foreach (string childId in childIds)
+                {
+                    NodeStruct node = new NodeStruct(childId);
+                    _children.Add(node);
+                    _idToNodeMap[childId] = node;
+                }
+            }
+            else
+            {
+                _parent = new NodeStruct(rootId);
+                _idToNodeMap[rootId] = _parent;
+            }
+        }
+
+        /// <summary>
+        /// Initializes operator topology.
+        /// Waits until all Tasks in the CommunicationGroup have registered themselves
+        /// with the Name Service.
+        /// </summary>
+        public void Initialize()
+        {
+            using (LOGGER.LogFunction("OperatorTopology::Initialize"))
+            {
+                if (_parent != null)
+                {
+                    WaitForTaskRegistration(_parent.Identifier, RetryCount);
+                }
+
+                if (_children.Count > 0)
+                {
+                    foreach (NodeStruct child in _children)
+                    {
+                        WaitForTaskRegistration(child.Identifier, RetryCount);
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Handles the incoming GroupCommunicationMessage.
+        /// Updates the sending node's message queue.
+        /// </summary>
+        /// <param name="gcm">The incoming message</param>
+        public void OnNext(GroupCommunicationMessage gcm)
+        {
+            if (gcm == null)
+            {
+                throw new ArgumentNullException("gcm");
+            }
+            if (gcm.Source == null)
+            {
+                throw new ArgumentException("Message must have a source");
+            }
+
+            NodeStruct sourceNode = FindNode(gcm.Source);
+            if (sourceNode == null)
+            {
+                throw new IllegalStateException("Received message from invalid task id: " + gcm.Source);
+            }
+
+            _nodesWithData.Add(sourceNode);
+            sourceNode.AddData(gcm);
+        }
+
+        /// <summary>
+        /// Sends the message to the parent Task.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="type">The message type</param>
+        public void SendToParent(T message, MessageType type)
+        {
+            if (_parent == null)
+            {
+                throw new ArgumentException("No parent for node");
+            }
+
+            SendToNode(message, MessageType.Data, _parent);
+        }
+
+        /// <summary>
+        /// Sends the message to all child Tasks.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="type">The message type</param>
+        public void SendToChildren(T message, MessageType type)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            foreach (NodeStruct child in _children)
+            {
+                SendToNode(message, MessageType.Data, child); 
+            }
+        }
+
+        /// <summary>
+        /// Splits the list of messages up evenly and sends each sublist
+        /// to the child Tasks.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="type">The message type</param>
+        public void ScatterToChildren(List<T> messages, MessageType type)
+        {
+            if (messages == null)
+            {
+                throw new ArgumentNullException("messages"); 
+            }
+            if (_children.Count <= 0)
+            {
+                throw new ArgumentException("Cannot scatter, no children available");
+            }
+
+            int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count);
+            ScatterHelper(messages, _children, count);
+        }
+
+        /// <summary>
+        /// Splits the list of messages up into chunks of the specified size 
+        /// and sends each sublist to the child Tasks.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="count">The size of each sublist</param>
+        /// <param name="type">The message type</param>
+        public void ScatterToChildren(List<T> messages, int count, MessageType type)
+        {
+            if (messages == null)
+            {
+                throw new ArgumentNullException("messages");
+            }
+            if (count <= 0)
+            {
+                throw new ArgumentException("Count must be positive");
+            }
+
+            ScatterHelper(messages, _children, count);
+        }
+
+        /// <summary>
+        /// Splits the list of messages up into chunks of the specified size 
+        /// and sends each sublist to the child Tasks in the specified order.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="order">The order to send messages</param>
+        /// <param name="type">The message type</param>
+        public void ScatterToChildren(List<T> messages, List<string> order, MessageType type)
+        {
+            if (messages == null)
+            {
+                throw new ArgumentNullException("messages");
+            }
+            if (order == null || order.Count != _children.Count)
+            {
+                throw new ArgumentException("order cannot be null and must have the same number of elements as child tasks");
+            }
+
+            List<NodeStruct> nodes = new List<NodeStruct>(); 
+            foreach (string taskId in order)
+            {
+                NodeStruct node = FindNode(taskId);
+                if (node == null)
+                {
+                    throw new IllegalStateException("Received message from invalid task id: " + taskId);
+                }
+
+                nodes.Add(node);
+            }
+
+            int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count);
+            ScatterHelper(messages, nodes, count);
+        }
+
+        /// <summary>
+        /// Receive an incoming message from the parent Task.
+        /// </summary>
+        /// <returns>The parent Task's message</returns>
+        public T ReceiveFromParent()
+        {
+            byte[][] data = ReceiveFromNode(_parent, true);
+            if (data == null || data.Length != 1)
+            {
+                throw new InvalidOperationException("Cannot receive data from parent node");
+            }
+
+            return _codec.Decode(data[0]);
+        }
+
+        /// <summary>
+        /// Receive a list of incoming messages from the parent Task.
+        /// </summary>
+        /// <returns>The parent Task's list of messages</returns>
+        public List<T> ReceiveListFromParent()
+        {
+            byte[][] data = ReceiveFromNode(_parent, true);
+            if (data == null || data.Length == 0)
+            {
+                throw new InvalidOperationException("Cannot receive data from parent node");
+            }
+
+            return data.Select(b => _codec.Decode(b)).ToList();
+        }
+
+        /// <summary>
+        /// Receives all messages from child Tasks and reduces them with the
+        /// given IReduceFunction.
+        /// </summary>
+        /// <param name="reduceFunction">The class used to reduce messages</param>
+        /// <returns>The result of reducing messages</returns>
+        public T ReceiveFromChildren(IReduceFunction<T> reduceFunction)
+        {
+            if (reduceFunction == null)
+            {
+                throw new ArgumentNullException("reduceFunction");
+            }
+
+            var receivedData = new List<T>();
+            var childrenToReceiveFrom = new HashSet<string>(_children.Select(node => node.Identifier));
+
+            while (childrenToReceiveFrom.Count > 0)
+            {
+                NodeStruct childWithData = GetNodeWithData();
+                byte[][] data = ReceiveFromNode(childWithData, false);
+                if (data == null || data.Length != 1)
+                {
+                    throw new InvalidOperationException("Received invalid data from child with id: " + childWithData.Identifier);
+                }
+
+                receivedData.Add(_codec.Decode(data[0]));
+                childrenToReceiveFrom.Remove(childWithData.Identifier);
+            }
+
+            return reduceFunction.Reduce(receivedData);
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        /// <summary>
+        /// Get a node containing an incoming message.
+        /// </summary>
+        /// <returns>A NodeStruct with incoming data.</returns>
+        private NodeStruct GetNodeWithData()
+        {
+            CancellationTokenSource timeoutSource = new CancellationTokenSource(DefaultTimeout);
+
+            try
+            {
+                return _nodesWithData.Take(timeoutSource.Token);
+            }
+            catch (OperationCanceledException)
+            {
+                LOGGER.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                LOGGER.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Sends the message to the Task represented by the given NodeStruct.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="msgType">The message type</param>
+        /// <param name="node">The NodeStruct representing the Task to send to</param>
+        private void SendToNode(T message, MessageType msgType, NodeStruct node)
+        {
+            GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName,
+                _selfId, node.Identifier, _codec.Encode(message), msgType);
+
+            _sender.Send(gcm);
+        }
+
+        /// <summary>
+        /// Sends the list of messages to the Task represented by the given NodeStruct.
+        /// </summary>
+        /// <param name="messages">The list of messages to send</param>
+        /// <param name="msgType">The message type</param>
+        /// <param name="node">The NodeStruct representing the Task to send to</param>
+        private void SendToNode(List<T> messages, MessageType msgType, NodeStruct node)
+        {
+            byte[][] encodedMessages = messages.Select(message => _codec.Encode(message)).ToArray();
+            GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName,
+                _selfId, node.Identifier, encodedMessages, msgType);
+
+            _sender.Send(gcm);
+        }
+
+        private void ScatterHelper(List<T> messages, List<NodeStruct> order, int count)
+        {
+            if (count <= 0)
+            {
+                throw new ArgumentException("Count must be positive");
+            }
+
+            int i = 0;
+            foreach (NodeStruct nodeStruct in order)
+            {
+                // The last sublist might be smaller than count if the number of
+                // child tasks is not evenly divisible by count
+                int left = messages.Count - i;
+                int size = (left < count) ? left : count;
+                if (size <= 0)
+                {
+                    throw new ArgumentException("Scatter count must be positive");
+                }
+
+                List<T> sublist = messages.GetRange(i, size);
+                SendToNode(sublist, MessageType.Data, nodeStruct);
+
+                i += size;
+            }
+        }
+
+        /// <summary>
+        /// Receive a message from the Task represented by the given NodeStruct.
+        /// Removes the NodeStruct from the nodesWithData queue if requested.
+        /// </summary>
+        /// <param name="node">The node to receive from</param>
+        /// <param name="removeFromQueue">Whether or not to remove the NodeStruct
+        /// from the nodesWithData queue</param>
+        /// <returns>The byte array message from the node</returns>
+        private byte[][] ReceiveFromNode(NodeStruct node, bool removeFromQueue)
+        {
+            byte[][] data = node.GetData();
+            if (removeFromQueue)
+            {
+                _nodesWithData.Take(node);
+            }
+
+            return data;
+        }
+
+        /// <summary>
+        /// Find the NodeStruct with the given Task identifier.
+        /// </summary>
+        /// <param name="identifier">The identifier of the Task</param>
+        /// <returns>The NodeStruct</returns>
+        private NodeStruct FindNode(string identifier)
+        {
+            NodeStruct node;
+            return _idToNodeMap.TryGetValue(identifier, out node) ? node : null;
+        }
+
+        /// <summary>
+        /// Checks if the identifier is registered with the Name Server.
+        /// Throws exception if the operation fails more than the retry count.
+        /// </summary>
+        /// <param name="identifier">The identifier to look up</param>
+        /// <param name="retries">The number of times to retry the lookup operation</param>
+        private void WaitForTaskRegistration(string identifier, int retries)
+        {
+            for (int i = 0; i < retries; i++)
+            {
+                if (_nameClient.Lookup(identifier) != null)
+                {
+                    return;
+                }
+
+                Thread.Sleep(500);
+                LOGGER.Log(Level.Verbose, "Retry {0}: retrying lookup for node: {1}", i + 1, identifier);
+            }
+
+            throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
new file mode 100644
index 0000000..5342410
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+    /// <summary>
+    /// Represents a graph of MPI Operators where there are only two levels of
+    /// nodes: the root and all children extending from the root.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class FlatTopology<T> : ITopology<T>
+    {
+        private string _groupName;
+        private string _operatorName;
+
+        private string _rootId;
+        private string _driverId;
+
+        private Dictionary<string, TaskNode> _nodes;
+        private TaskNode _root;
+
+        /// <summary>
+        /// Creates a new FlatTopology.
+        /// </summary>
+        /// <param name="operatorName">The operator name</param>
+        /// <param name="groupName">The name of the topology's CommunicationGroup</param>
+        /// <param name="rootId">The root Task identifier</param>
+        /// <param name="driverId">The driver identifier</param>
+        /// <param name="operatorSpec">The operator specification</param>
+        public FlatTopology(
+            string operatorName, 
+            string groupName, 
+            string rootId,
+            string driverId,
+            IOperatorSpec<T> operatorSpec)
+        {
+            _groupName = groupName;
+            _operatorName = operatorName;
+            _rootId = rootId;
+            _driverId = driverId;
+
+            OperatorSpec = operatorSpec;
+
+            _nodes = new Dictionary<string, TaskNode>(); 
+        }
+
+        /// <summary>
+        /// Gets the Operator specification
+        /// </summary>
+        public IOperatorSpec<T> OperatorSpec { get; set; }
+
+        /// <summary>
+        /// Gets the task configuration for the operator topology.
+        /// </summary>
+        /// <param name="taskId">The task identifier</param>
+        /// <returns>The task configuration</returns>
+        public IConfiguration GetTaskConfiguration(string taskId)
+        {
+            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType())
+                .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
+                    GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
+                    _rootId);
+
+            foreach (string tId in _nodes.Keys)
+            {
+                if (!tId.Equals(_rootId))
+                {
+                    confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>(
+                        GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class,
+                        tId);
+                }
+            }
+            
+            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            {
+                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>;
+                if (taskId.Equals(broadcastSpec.SenderId))
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+                }
+                else
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+                }
+            }
+            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            {
+                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType());
+                
+                if (taskId.Equals(reduceSpec.ReceiverId))
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+                }
+                else
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+                }
+            }
+            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            {
+                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>;
+                if (taskId.Equals(scatterSpec.SenderId))
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+                }
+                else
+                {
+                    confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+                }
+            }
+            else
+            {
+                throw new NotSupportedException("Spec type not supported");
+            }
+
+            return confBuilder.Build();
+        }
+
+        /// <summary>
+        /// Adds a task to the topology graph.
+        /// </summary>
+        /// <param name="taskId">The identifier of the task to add</param>
+        public void AddTask(string taskId)
+        {
+            if (string.IsNullOrEmpty(taskId))
+            {
+                throw new ArgumentNullException("taskId");
+            }
+            else if (_nodes.ContainsKey(taskId))
+            {
+                throw new ArgumentException("Task has already been added to the topology");
+            }
+
+            if (taskId.Equals(_rootId))
+            {
+                SetRootNode(_rootId);
+            }
+            else
+            {
+                AddChild(taskId);
+            }
+        }
+
+        private void SetRootNode(string rootId)
+        {
+            TaskNode rootNode = new TaskNode(_groupName, _operatorName, rootId, _driverId);
+            _root = rootNode;
+
+            foreach (TaskNode childNode in _nodes.Values)
+            {
+                rootNode.AddChild(childNode);
+                childNode.SetParent(rootNode);
+            }
+        }
+
+        private void AddChild(string childId)
+        {
+            TaskNode childNode = new TaskNode(_groupName, _operatorName, childId, _driverId);
+            _nodes[childId] = childNode;
+
+            if (_root != null)
+            {
+                _root.AddChild(childNode);
+                childNode.SetParent(_root);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
new file mode 100644
index 0000000..c4dc9e6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+    /// <summary>
+    /// Represents a topology graph for IMpiOperators.
+    /// </summary>
+    public interface ITopology<T>
+    {
+        IOperatorSpec<T> OperatorSpec { get; }
+
+        IConfiguration GetTaskConfiguration(string taskId);
+
+        void AddTask(string taskId);
+    }
+}