You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/11 23:58:08 UTC

[3/4] incubator-reef git commit: [REEF-150] Adding group communication to REEF .Net

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
new file mode 100644
index 0000000..423e916
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    public class MpiConfigurationOptions
+    {
+        [NamedParameter("Name of the communication group")]
+        public class CommunicationGroupName : Name<string>
+        {
+        }
+
+        [NamedParameter("Name of the MPI operator")]
+        public class OperatorName : Name<string>
+        {
+        }
+
+        [NamedParameter("Driver identifier")]
+        public class DriverId : Name<string>
+        {
+        }
+
+        [NamedParameter("Master task identifier")]
+        public class MasterTaskId : Name<string>
+        {
+        }
+
+        [NamedParameter("Serialized communication group configuration")]
+        public class SerializedGroupConfigs : Name<ISet<string>>
+        {
+        }
+
+        [NamedParameter("Serialized operator configuration")]
+        public class SerializedOperatorConfigs : Name<ISet<string>>
+        {
+        }
+
+        [NamedParameter("Id of root task in operator topology")]
+        public class TopologyRootTaskId : Name<string>
+        {
+        }
+
+        [NamedParameter("Ids of child tasks in operator topology")]
+        public class TopologyChildTaskIds : Name<ISet<string>>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
new file mode 100644
index 0000000..a2a249d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Tang.Interface;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Group.Driver
+{
+    /// <summary>
+    /// Used to configure MPI operators in Reef driver.
+    /// All operators in the same Communication Group run on the the 
+    /// same set of tasks.
+    /// </summary>
+    public interface ICommunicationGroupDriver
+    {
+        /// <summary>
+        /// Returns the list of task ids that belong to this Communication Group
+        /// </summary>
+        List<string> TaskIds { get; } 
+
+        /// <summary>
+        /// Adds the Broadcast MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the broadcast operator</param>
+        /// <param name="spec">The specification that defines the Broadcast operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
+        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec);
+
+        /// <summary>
+        /// Adds the Reduce MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="spec">The specification that defines the Reduce operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
+        ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec);
+
+        /// <summary>
+        /// Adds the Scatter MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the scatter operator</param>
+        /// <param name="spec">The specification that defines the Scatter operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
+        ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec);
+
+        /// <summary>
+        /// Finalizes the CommunicationGroupDriver.
+        /// After the CommunicationGroupDriver has been finalized, no more operators may
+        /// be added to the group.
+        /// </summary>
+        /// <returns>The same finalized CommunicationGroupDriver</returns>
+        ICommunicationGroupDriver Build();
+
+        /// <summary>
+        /// Add a task to the communication group.
+        /// The CommunicationGroupDriver must have called Build() before adding tasks to the group.
+        /// </summary>
+        /// <param name="taskId">The id of the task to add</param>
+        void AddTask(string taskId);
+
+        /// <summary>
+        /// Get the Task Configuration for this communication group. 
+        /// Must be called only after all tasks have been added to the CommunicationGroupDriver.
+        /// </summary>
+        /// <param name="taskId">The task id of the task that belongs to this Communication Group</param>
+        /// <returns>The Task Configuration for this communication group</returns>
+        IConfiguration GetGroupTaskConfiguration(string taskId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
new file mode 100644
index 0000000..422d63e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Network.Group.Driver
+{
+    /// <summary>
+    /// Used to create Communication Groups for MPI Operators.
+    /// Also manages configuration for MPI tasks/services.
+    /// </summary>
+    public interface IMpiDriver
+    {
+        /// <summary>
+        /// Returns the identifier for the master task
+        /// </summary>
+        string MasterTaskId { get; }
+
+        /// <summary>
+        /// Create a new CommunicationGroup with the given name and number of tasks/operators. 
+        /// </summary>
+        /// <param name="groupName">The new group name</param>
+        /// <param name="numTasks">The number of tasks/operators in the group.</param>
+        /// <returns>The new Communication Group</returns>
+        ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks);
+
+        /// <summary>
+        /// Generates context configuration with a unique identifier.
+        /// </summary>
+        /// <returns>The configured context configuration</returns>
+        IConfiguration GetContextConfiguration();
+
+        /// <summary>
+        /// Get the service configuration required for running MPI on Reef tasks.
+        /// </summary>
+        /// <returns>The service configuration for the Reef tasks</returns>
+        IConfiguration GetServiceConfiguration();
+
+        /// <summary>
+        /// Checks whether this active context can be used to run the Master Task.
+        /// </summary>
+        /// <param name="activeContext">The active context to check</param>
+        /// <returns>True if the active context can run the Master task,
+        /// otherwise false.</returns>
+        bool IsMasterTaskContext(IActiveContext activeContext);
+
+        /// <summary>
+        /// Checks whether this context configuration is used to configure the Master Task.
+        /// </summary>
+        /// <param name="contextConfiguration">The context configuration to check</param>
+        /// <returns>True if the context configuration is used to configure the Master
+        /// Task, otherwise false.</returns>
+        bool IsMasterContextConfiguration(IConfiguration contextConfiguration);
+
+        /// <summary>
+        /// Gets the context number associated with the Active Context id.
+        /// </summary>
+        /// <param name="activeContext">The active context to check</param>
+        /// <returns>The context number associated with the active context id</returns>
+        int GetContextNum(IActiveContext activeContext);
+
+        /// <summary>
+        /// Get the configuration for a particular task.  
+        ///
+        /// The task may belong to many Communication Groups, so each one is serialized
+        /// in the configuration as a SerializedGroupConfig.
+        ///
+        /// The user must merge their part of task configuration (task id, task class)
+        /// with this returned MPI task configuration.
+        /// </summary>
+        /// <param name="taskId">The id of the task Configuration to generate</param>
+        /// <returns>The MPI task configuration with communication group and
+        /// operator configuration set.</returns>
+        IConfiguration GetMpiTaskConfiguration(string taskId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
new file mode 100644
index 0000000..d29c504
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Used to configure MPI operators in Reef driver.
+    /// All operators in the same Communication Group run on the the 
+    /// same set of tasks.
+    /// </summary>
+    public class CommunicationGroupDriver : ICommunicationGroupDriver
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupDriver));
+
+        private string _groupName;
+        private string _driverId;
+        private int _numTasks;
+        private int _tasksAdded;
+        private bool _finalized;
+
+        private AvroConfigurationSerializer _confSerializer;
+
+        private object _topologyLock;
+        private Dictionary<string, object> _operatorSpecs;
+        private Dictionary<string, object> _topologies;
+
+        /// <summary>
+        /// Create a new CommunicationGroupDriver.
+        /// </summary>
+        /// <param name="groupName">The communication group name</param>
+        /// <param name="driverId">Identifier of the Reef driver</param>
+        /// <param name="numTasks">The number of tasks each operator will use</param>
+        /// <param name="confSerializer">Used to serialize task configuration</param>
+        public CommunicationGroupDriver(
+            string groupName,
+            string driverId, 
+            int numTasks,
+            AvroConfigurationSerializer confSerializer)
+        {
+            _confSerializer = confSerializer;
+            _groupName = groupName;
+            _driverId = driverId;
+            _numTasks = numTasks;
+            _tasksAdded = 0;
+            _finalized = false;
+
+            _topologyLock = new object();
+
+            _operatorSpecs = new Dictionary<string, object>();
+            _topologies = new Dictionary<string, object>();
+            TaskIds = new List<string>();
+        }
+
+        /// <summary>
+        /// Returns the list of task ids that belong to this Communication Group
+        /// </summary>
+        public List<string> TaskIds { get; private set; } 
+
+        /// <summary>
+        /// Adds the Broadcast MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the broadcast operator</param>
+        /// <param name="spec">The specification that defines the Broadcast operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
+        public ICommunicationGroupDriver AddBroadcast<T>(
+            string operatorName, 
+            BroadcastOperatorSpec<T> spec)
+        {
+            if (_finalized)
+            {
+                throw new IllegalStateException("Can't add operators once the spec has been built.");
+            }
+
+            ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+            _topologies[operatorName] = topology;
+            _operatorSpecs[operatorName] = spec;
+
+            return this;
+        }
+
+        /// <summary>
+        /// Adds the Reduce MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="spec">The specification that defines the Reduce operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
+        public ICommunicationGroupDriver AddReduce<T>(
+            string operatorName, 
+            ReduceOperatorSpec<T> spec)
+        {
+            if (_finalized)
+            {
+                throw new IllegalStateException("Can't add operators once the spec has been built.");
+            }
+
+            ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
+            _topologies[operatorName] = topology;
+            _operatorSpecs[operatorName] = spec;
+
+            return this;
+        }
+
+        /// <summary>
+        /// Adds the Scatter MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send</typeparam>
+        /// <param name="operatorName">The name of the scatter operator</param>
+        /// <param name="spec">The specification that defines the Scatter operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
+        public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec)
+        {
+            if (_finalized)
+            {
+                throw new IllegalStateException("Can't add operators once the spec has been built.");
+            }
+
+            ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+            _topologies[operatorName] = topology;
+            _operatorSpecs[operatorName] = spec;
+
+            return this;
+        }
+
+        /// <summary>
+        /// Finalizes the CommunicationGroupDriver.
+        /// After the CommunicationGroupDriver has been finalized, no more operators may
+        /// be added to the group.
+        /// </summary>
+        /// <returns>The same finalized CommunicationGroupDriver</returns>
+        public ICommunicationGroupDriver Build()
+        {
+            _finalized = true;
+            return this;
+        }
+
+        /// <summary>
+        /// Add a task to the communication group.
+        /// The CommunicationGroupDriver must have called Build() before adding tasks to the group.
+        /// </summary>
+        /// <param name="taskId">The id of the task to add</param>
+        public void AddTask(string taskId)
+        {
+            if (!_finalized)
+            {
+                throw new IllegalStateException("CommunicationGroupDriver must call Build() before adding tasks to the group.");
+            }
+
+            lock (_topologyLock)
+            {
+                _tasksAdded++;
+                if (_tasksAdded > _numTasks)
+                {
+                    throw new IllegalStateException("Added too many tasks to Communication Group, expected: " + _numTasks);
+                }
+
+                TaskIds.Add(taskId);
+                foreach (string operatorName in _operatorSpecs.Keys)
+                {
+                    AddTask(operatorName, taskId);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Get the Task Configuration for this communication group. 
+        /// Must be called only after all tasks have been added to the CommunicationGroupDriver.
+        /// </summary>
+        /// <param name="taskId">The task id of the task that belongs to this Communication Group</param>
+        /// <returns>The Task Configuration for this communication group</returns>
+        public IConfiguration GetGroupTaskConfiguration(string taskId)
+        {
+            if (!TaskIds.Contains(taskId))
+            {
+                return null;
+            }
+
+            // Make sure all tasks have been added to communication group before generating config
+            lock (_topologyLock)
+            {
+                if (_tasksAdded != _numTasks)
+                {
+                    throw new IllegalStateException(
+                        "Must add all tasks to communication group before fetching configuration");
+                }
+            }
+
+            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<MpiConfigurationOptions.DriverId, string>(
+                    GenericType<MpiConfigurationOptions.DriverId>.Class,
+                    _driverId)
+                .BindNamedParameter<MpiConfigurationOptions.CommunicationGroupName, string>(
+                    GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class,
+                    _groupName);
+
+            foreach (var operatorName in _topologies.Keys)
+            {
+                var innerConf = TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId))
+                    .BindNamedParameter<MpiConfigurationOptions.DriverId, string>(
+                        GenericType<MpiConfigurationOptions.DriverId>.Class,
+                        _driverId)
+                    .BindNamedParameter<MpiConfigurationOptions.OperatorName, string>(
+                        GenericType<MpiConfigurationOptions.OperatorName>.Class,
+                        operatorName)
+                    .Build();
+
+                confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedOperatorConfigs, string>(
+                    GenericType<MpiConfigurationOptions.SerializedOperatorConfigs>.Class,
+                    _confSerializer.ToString(innerConf));
+            }
+
+            return confBuilder.Build();
+        }
+
+        private void AddTask(string operatorName, string taskId)
+        {
+            var topology = _topologies[operatorName];
+            MethodInfo info = topology.GetType().GetMethod("AddTask");
+            info.Invoke(topology, new[] { (object) taskId });
+        }
+
+        private IConfiguration GetOperatorConfiguration(string operatorName, string taskId)
+        {
+            var topology = _topologies[operatorName];
+            MethodInfo info = topology.GetType().GetMethod("GetTaskConfiguration");
+            return (IConfiguration) info.Invoke(topology, new[] { (object) taskId });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
new file mode 100644
index 0000000..1439a36
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Messages sent by MPI Operators
+    /// </summary>
+    public class GroupCommunicationMessage
+    {
+        /// <summary>
+        /// Create new CommunicationGroupMessage.
+        /// </summary>
+        /// <param name="groupName">The name of the communication group</param>
+        /// <param name="operatorName">The name of the MPI operator</param>
+        /// <param name="source">The message source</param>
+        /// <param name="destination">The message destination</param>
+        /// <param name="data">The actual byte array of data</param>
+        /// <param name="messageType">The type of message to send</param>
+        public GroupCommunicationMessage(
+            string groupName,
+            string operatorName,
+            string source,
+            string destination,
+            byte[] data,
+            MessageType messageType)
+        {
+            GroupName = groupName;
+            OperatorName = operatorName;
+            Source = source;
+            Destination = destination;
+            Data = new[] { data };
+            MsgType = messageType;
+        }
+
+        /// <summary>
+        /// Create new CommunicationGroupMessage.
+        /// </summary>
+        /// <param name="groupName">The name of the communication group</param>
+        /// <param name="operatorName">The name of the MPI operator</param>
+        /// <param name="source">The message source</param>
+        /// <param name="destination">The message destination</param>
+        /// <param name="data">The actual byte array of data</param>
+        /// <param name="messageType">The type of message to send</param>
+        public GroupCommunicationMessage(
+            string groupName,
+            string operatorName,
+            string source,
+            string destination,
+            byte[][] data,
+            MessageType messageType)
+        {
+            GroupName = groupName;
+            OperatorName = operatorName;
+            Source = source;
+            Destination = destination;
+            Data = data;
+            MsgType = messageType;
+        }
+
+        /// <summary>
+        /// Returns the Communication Group name.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the MPI Operator name.
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the source of the message.
+        /// </summary>
+        public string Source { get; private set; }
+
+        /// <summary>
+        /// Returns the destination of the message.
+        /// </summary>
+        public string Destination { get; private set; }
+
+        /// <summary>
+        /// Returns the message data.
+        /// </summary>
+        public byte[][] Data { get; private set; }
+
+        /// <summary>
+        /// Returns the type of message being sent.
+        /// </summary>
+        public MessageType MsgType { get; private set; }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
new file mode 100644
index 0000000..cd8ace2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Represents the different types of messages that Mpi Tasks can
+    /// send to each other.
+    /// </summary>
+    public enum MessageType
+    {
+        Data = 0
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
new file mode 100644
index 0000000..a373ef3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Net;
+using System.Threading;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Network.Group.Codec;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Used to create Communication Groups for MPI Operators on the Reef driver.
+    /// Also manages configuration for MPI tasks/services.
+    /// </summary>
+    public class MpiDriver : IMpiDriver
+    {
+        private const string MasterTaskContextName = "MasterTaskContext";
+        private const string SlaveTaskContextName = "SlaveTaskContext";
+
+        private static Logger LOGGER = Logger.GetLogger(typeof(MpiDriver));
+
+        private string _driverId;
+        private string _nameServerAddr;
+        private int _nameServerPort;
+        private int _contextIds;
+
+        private Dictionary<string, ICommunicationGroupDriver> _commGroups; 
+        private AvroConfigurationSerializer _configSerializer;
+        private NameServer _nameServer;
+
+        /// <summary>
+        /// Create a new MpiDriver object.
+        /// </summary>
+        /// <param name="driverId">Identifer for the REEF driver</param>
+        /// <param name="masterTaskId">Identifer for MPI master task</param>
+        /// <param name="configSerializer">Used to serialize task configuration</param>
+        [Inject]
+        public MpiDriver(
+            [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+            [Parameter(typeof(MpiConfigurationOptions.MasterTaskId))] string masterTaskId,
+            AvroConfigurationSerializer configSerializer)
+        {
+            _driverId = driverId;
+            _contextIds = -1;
+            MasterTaskId = masterTaskId;
+
+            _configSerializer = configSerializer;
+            _commGroups = new Dictionary<string, ICommunicationGroupDriver>();
+            _nameServer = new NameServer(0);
+
+            IPEndPoint localEndpoint = _nameServer.LocalEndpoint;
+            _nameServerAddr = localEndpoint.Address.ToString();
+            _nameServerPort = localEndpoint.Port;
+        }
+
+        /// <summary>
+        /// Returns the identifier for the master task
+        /// </summary>
+        public string MasterTaskId { get; private set; }
+
+        /// <summary>
+        /// Create a new CommunicationGroup with the given name and number of tasks/operators. 
+        /// </summary>
+        /// <param name="groupName">The new group name</param>
+        /// <param name="numTasks">The number of tasks/operators in the group.</param>
+        /// <returns>The new Communication Group</returns>
+        public ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks)
+        {
+            if (string.IsNullOrEmpty(groupName))
+            {
+                throw new ArgumentNullException("groupName");
+            }
+            else if (numTasks < 1)
+            {
+                throw new ArgumentException("NumTasks must be greater than 0");                
+            }
+            else if (_commGroups.ContainsKey(groupName))
+            {
+                throw new ArgumentException("Group Name already registered with MpiDriver");
+            }
+
+            var commGroup = new CommunicationGroupDriver(groupName, _driverId, numTasks, _configSerializer);
+            _commGroups[groupName] = commGroup;
+            return commGroup;
+        }
+
+        /// <summary>
+        /// Generates context configuration with a unique identifier.
+        /// </summary>
+        /// <returns>The configured context configuration</returns>
+        public IConfiguration GetContextConfiguration()
+        {
+            int contextNum = Interlocked.Increment(ref _contextIds);
+            string id = (contextNum == 0) 
+                ? MasterTaskContextName 
+                : GetSlaveTaskContextName(contextNum);
+
+            return ContextConfiguration.ConfigurationModule
+                .Set(ContextConfiguration.Identifier, id)
+                .Build();
+        }
+
+        /// <summary>
+        /// Get the service configuration required for running MPI on Reef tasks.
+        /// </summary>
+        /// <returns>The service configuration for the Reef tasks</returns>
+        public IConfiguration GetServiceConfiguration()
+        {
+            IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule
+                .Set(ServiceConfiguration.Services, GenericType<NetworkService<GroupCommunicationMessage>>.Class)
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
+                .BindImplementation(
+                    GenericType<IObserver<NsMessage<GroupCommunicationMessage>>>.Class,
+                    GenericType<MpiNetworkObserver>.Class)
+                .BindImplementation(
+                    GenericType<ICodec<GroupCommunicationMessage>>.Class,
+                    GenericType<GroupCommunicationMessageCodec>.Class)
+                .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+                    GenericType<NamingConfigurationOptions.NameServerAddress>.Class, 
+                    _nameServerAddr)
+                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+                    GenericType<NamingConfigurationOptions.NameServerPort>.Class, 
+                    _nameServerPort.ToString(CultureInfo.InvariantCulture))
+                .Build();
+        }
+
+        /// <summary>
+        /// Get the configuration for a particular task.  
+        ///
+        /// The task may belong to many Communication Groups, so each one is serialized
+        /// in the configuration as a SerializedGroupConfig.
+        ///
+        /// The user must merge their part of task configuration (task id, task class)
+        /// with this returned MPI task configuration.
+        /// </summary>
+        /// <param name="taskId">The id of the task Configuration to generate</param>
+        /// <returns>The MPI task configuration with communication group and
+        /// operator configuration set.</returns>
+        public IConfiguration GetMpiTaskConfiguration(string taskId)
+        {
+            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder();
+
+            foreach (ICommunicationGroupDriver commGroup in _commGroups.Values)
+            {
+                var taskConf = commGroup.GetGroupTaskConfiguration(taskId);
+                if (taskConf != null)
+                {
+                    confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedGroupConfigs, string>(
+                        GenericType<MpiConfigurationOptions.SerializedGroupConfigs>.Class,
+                        _configSerializer.ToString(taskConf));
+                }
+            }
+
+            return confBuilder.Build();
+        }
+
+        /// <summary>
+        /// Checks whether this active context can be used to run the Master Task.
+        /// </summary>
+        /// <param name="activeContext">The active context to check</param>
+        /// <returns>True if the active context can run the Master task,
+        /// otherwise false.</returns>
+        public bool IsMasterTaskContext(IActiveContext activeContext)
+        {
+            return activeContext.Id.Equals(MasterTaskContextName);
+        }
+
+        /// <summary>
+        /// Checks whether this context configuration is used to configure the Master Task.
+        /// </summary>
+        /// <param name="contextConfiguration">The context configuration to check</param>
+        /// <returns>True if the context configuration is used to configure the Master
+        /// Task, otherwise false.</returns>
+        public bool IsMasterContextConfiguration(IConfiguration contextConfiguration)
+        {
+            return Utilities.Utils.GetContextId(contextConfiguration).Equals(MasterTaskContextName);
+        }
+
+        /// <summary>
+        /// Gets the context number associated with the Active Context id.
+        /// </summary>
+        /// <param name="activeContext">The active context to check</param>
+        /// <returns>The context number associated with the active context id</returns>
+        public int GetContextNum(IActiveContext activeContext)
+        {
+            if (activeContext.Id.Equals(MasterTaskContextName))
+            {
+                return 0;
+            }
+
+            string[] parts = activeContext.Id.Split('-');
+            if (parts.Length != 2)
+            {
+                throw new ArgumentException("Invalid id in active context");
+            }
+
+            return int.Parse(parts[1], CultureInfo.InvariantCulture);
+        }
+
+        private string GetSlaveTaskContextName(int contextNum)
+        {
+            return string.Format(CultureInfo.InvariantCulture, "{0}-{1}", SlaveTaskContextName, contextNum);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
new file mode 100644
index 0000000..48463dc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Helper class to start MPI tasks.
+    /// </summary>
+    public class TaskStarter
+    {
+        private static Logger LOGGER = Logger.GetLogger(typeof(TaskStarter));
+
+        private object _lock;
+        private int _numTasks;
+        private int _tasksAdded;
+        private string _masterTaskId;
+
+        private IMpiDriver _mpiDriver;
+        private List<Tuple<string, IConfiguration, IActiveContext>> _taskTuples; 
+
+        /// <summary>
+        /// Create new TaskStarter.
+        /// After adding the correct number of tasks to the TaskStarter, the
+        /// Tasks will be started on their given active context.
+        /// </summary>
+        /// <param name="mpiDriver">The IMpiDriver for the MPI tasks</param>
+        /// <param name="numTasks">The number of Tasks that need to be added before
+        /// the Tasks will be started. </param>
+        public TaskStarter(IMpiDriver mpiDriver, int numTasks)
+        {
+            LOGGER.Log(Level.Verbose, "Creating TaskStarter");
+            _masterTaskId = mpiDriver.MasterTaskId;
+            _numTasks = numTasks;
+            _tasksAdded = 0;
+            _lock = new object();
+
+            _mpiDriver = mpiDriver;
+            _taskTuples = new List<Tuple<string, IConfiguration, IActiveContext>>();
+        }
+
+        /// <summary>
+        /// Queues the task into the TaskStarter.
+        /// 
+        /// Once the correct number of tasks have been queued, the final Configuration
+        /// will be generated and run on the given Active Context.
+        /// </summary>
+        /// <param name="partialTaskConfig">The partial task configuration containing Task
+        /// identifier and Task class</param>
+        /// <param name="activeContext">The Active Context to run the Task on</param>
+        public void QueueTask(IConfiguration partialTaskConfig, IActiveContext activeContext)
+        {
+            string taskId = Utils.GetTaskId(partialTaskConfig); 
+            LOGGER.Log(Level.Verbose, "Adding context with identifier: " + taskId);
+            
+            lock (_lock)
+            {
+                _taskTuples.Add(
+                    new Tuple<string, IConfiguration, IActiveContext>(taskId, partialTaskConfig, activeContext));
+
+                if (Interlocked.Increment(ref _tasksAdded) == _numTasks)
+                {
+                    StartTasks();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Starts the Master Task followed by the Slave Tasks.
+        /// </summary>
+        private void StartTasks()
+        {
+            Tuple<string, IConfiguration, IActiveContext> masterTaskTuple;
+            try
+            {
+                masterTaskTuple = _taskTuples.Single(tuple => tuple.Item1.Equals(_masterTaskId));
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Error, "There must be exactly one master task. The driver has been misconfigured.");
+                throw;
+            }
+
+            LOGGER.Log(Level.Verbose, "Starting master task on context id: {0}.", masterTaskTuple.Item3.Id);
+            StartTask(masterTaskTuple.Item1, masterTaskTuple.Item2, masterTaskTuple.Item3);
+
+            LOGGER.Log(Level.Verbose, "Starting slave tasks.");
+            foreach (Tuple<string, IConfiguration, IActiveContext> taskTuple in _taskTuples)
+            {
+                string taskId = taskTuple.Item1;
+                if (taskId.Equals(_masterTaskId))
+                {
+                    continue;
+                }
+
+                StartTask(taskId, taskTuple.Item2, taskTuple.Item3);
+            }
+        }
+
+        private void StartTask(
+            string taskId,
+            IConfiguration userPartialTaskConf,
+            IActiveContext activeContext)
+        {
+            IConfiguration mpiTaskConfiguration = _mpiDriver.GetMpiTaskConfiguration(taskId);
+            IConfiguration mergedTaskConf = Configurations.Merge(userPartialTaskConf, mpiTaskConfiguration);
+            activeContext.SubmitTask(mergedTaskConf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
new file mode 100644
index 0000000..09c8c30
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI Operator used to receive broadcast messages.
+    /// </summary>
+    /// <typeparam name="T">The type of message being sent.</typeparam>
+    public interface IBroadcastReceiver<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Receive a message from parent BroadcastSender.
+        /// </summary>
+        /// <returns>The incoming message</returns>
+        T Receive();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
new file mode 100644
index 0000000..534fa9f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI Operator used to send messages to child Tasks.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IBroadcastSender<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Send the data to all BroadcastReceivers.
+        /// </summary>
+        /// <param name="data">The data to send.</param>
+        void Send(T data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
new file mode 100644
index 0000000..661f348
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// An Mpi Operator to be used in a Reef Task.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IMpiOperator<T>
+    {
+        /// <summary>
+        /// The operator name.
+        /// </summary>
+        string OperatorName { get; }
+
+        /// <summary>
+        /// The name of the operator's CommunicationGroup.
+        /// </summary>
+        string GroupName { get; }
+
+        /// <summary>
+        /// The operator version number.
+        /// </summary>
+        int Version { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
new file mode 100644
index 0000000..5d18677
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// The specification used to define Broadcast Operators.
+    /// </summary>
+    public interface IOperatorSpec<T>
+    {
+        /// <summary>
+        /// Returns the codec used to serialize and deserialize messages.
+        /// </summary>
+        ICodec<T> Codec { get; }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
new file mode 100644
index 0000000..020b09a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// The class used to aggregate messages sent by ReduceSenders.
+    /// </summary>
+    /// <typeparam name="T">The message type.</typeparam>
+    public interface IReduceFunction<T>
+    {
+        /// <summary>
+        /// Reduce the IEnumerable of messages into one message.
+        /// </summary>
+        /// <param name="elements">The messages to reduce</param>
+        /// <returns>The reduced message</returns>
+        T Reduce(IEnumerable<T> elements);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
new file mode 100644
index 0000000..2305968
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI operator used to receive and reduce messages.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IReduceReceiver<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+        /// </summary>
+        IReduceFunction<T> ReduceFunction { get; } 
+
+        /// <summary>
+        /// Receives messages sent by all ReduceSenders and aggregates them
+        /// using the specified IReduceFunction.
+        /// </summary>
+        /// <returns>The single aggregated data</returns>
+        T Reduce();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
new file mode 100644
index 0000000..c15ded6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI Operator used to send messages to be reduced by the ReduceReceiver.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IReduceSender<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Sends data to the operator's ReduceReceiver to be aggregated.
+        /// </summary>
+        /// <param name="data">The data to send</param>
+        void Send(T data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
new file mode 100644
index 0000000..7aa4e81
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI operator used to receive a sublist of messages sent
+    /// from the IScatterSender.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IScatterReceiver<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Receive a sublist of messages sent from the IScatterSender.
+        /// </summary>
+        /// <returns>The sublist of messages</returns>
+        List<T> Receive();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
new file mode 100644
index 0000000..c5a2c3d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+    /// <summary>
+    /// MPI operator used to scatter a list of elements to all
+    /// of the IScatterReceivers.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface IScatterSender<T> : IMpiOperator<T>
+    {
+        /// <summary>
+        /// Split up the list of elements evenly and scatter each chunk
+        /// to the IScatterReceivers.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        void Send(List<T> elements);
+
+        /// <summary>
+        /// Split up the list of elements and scatter each chunk
+        /// to the IScatterReceivers.  Each receiver will receive
+        /// a sublist of the specified size.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        /// <param name="count">The size of each sublist</param>
+        void Send(List<T> elements, int count);
+
+        /// <summary>
+        /// Split up the list of elements and scatter each chunk
+        /// to the IScatterReceivers in the specified task order.
+        /// </summary>
+        /// <param name="elements">The list of elements to send.</param>
+        /// <param name="order">The list of task identifiers representing
+        /// the order in which to scatter each sublist</param>
+        void Send(List<T> elements, List<string> order);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
new file mode 100644
index 0000000..904e4ef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// The specification used to define Broadcast Operators.
+    /// </summary>
+    public class BroadcastOperatorSpec<T> : IOperatorSpec<T>
+    {
+        /// <summary>
+        /// Create a new BroadcastOperatorSpec.
+        /// </summary>
+        /// <param name="senderId">The identifier of the root sending Task.</param>
+        /// <param name="codecType">The codec used to serialize messages.</param>
+        public BroadcastOperatorSpec(string senderId, ICodec<T> codecType)
+        {
+            SenderId = senderId;
+            Codec = codecType;
+        }
+
+        /// <summary>
+        /// Returns the identifier of the Task that will broadcast data to other Tasks.
+        /// </summary>
+        public string SenderId { get; private set; }
+
+        /// <summary>
+        /// Returns the ICodec used to serialize messages.
+        /// </summary>
+        public ICodec<T> Codec { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
new file mode 100644
index 0000000..4374ab5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI Operator used to receive broadcast messages.
+    /// </summary>
+    /// <typeparam name="T">The type of message being sent.</typeparam>
+    public class BroadcastReceiver<T> : IBroadcastReceiver<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+
+        /// <summary>
+        /// Creates a new BroadcastReceiver.
+        /// </summary>
+        /// <param name="operatorName">The operator identifier</param>
+        /// <param name="groupName">The name of the CommunicationGroup that the
+        /// operator belongs to</param>
+        /// <param name="topology">The node's topology graph</param>
+        /// <param name="networkHandler">The incoming message handler</param>
+        [Inject]
+        public BroadcastReceiver(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology, 
+            ICommunicationGroupNetworkObserver networkHandler)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        /// <summary>
+        /// Returns the operator identifier.
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the name of the CommunicationGroup that the operator belongs to.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the operator version.
+        /// </summary>
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Receive a message from parent BroadcastSender.
+        /// </summary>
+        /// <returns>The incoming message</returns>
+        public T Receive()
+        {
+            return _topology.ReceiveFromParent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
new file mode 100644
index 0000000..4e48428
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI Operator used to send messages to child Tasks.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class BroadcastSender<T> : IBroadcastSender<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+            
+        /// <summary>
+        /// Creates a new BroadcastSender to send messages to other Tasks.
+        /// </summary>
+        /// <param name="operatorName">The identifier for the operator</param>
+        /// <param name="groupName">The name of the CommunicationGroup that the operator
+        /// belongs to</param>
+        /// <param name="topology">The node's topology graph</param>
+        /// <param name="networkHandler">The incoming message handler</param>
+        [Inject]
+        public BroadcastSender(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology, 
+            ICommunicationGroupNetworkObserver networkHandler)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        /// <summary>
+        /// Returns the identifier for the MPI operator.
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the name of the operator's CommunicationGroup.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the operator version.
+        /// </summary>
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Send the data to all BroadcastReceivers.
+        /// </summary>
+        /// <param name="data">The data to send.</param>
+        public void Send(T data)
+        {
+            if (data == null)
+            {
+                throw new ArgumentNullException("data");    
+            }
+
+            _topology.SendToChildren(data, MessageType.Data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
new file mode 100644
index 0000000..bc60055
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    public class ReduceFunction<T> : IReduceFunction<T>
+    {
+        private Func<T, T, T> _reduceFunction;
+        private T _initialValue;
+ 
+        private ReduceFunction(Func<T, T, T> reduceFunction)
+        {
+            _reduceFunction = reduceFunction;
+        }
+
+        private ReduceFunction(Func<T, T, T> reduceFunction, T initialValue)
+        {
+            _reduceFunction = reduceFunction;
+            _initialValue = initialValue;
+        }
+
+        public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction)
+        {
+            return new ReduceFunction<T>(reduceFunction);
+        }
+
+        public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction, T initialValue)
+        {
+            return new ReduceFunction<T>(reduceFunction, initialValue);
+        }
+
+        public T Reduce(IEnumerable<T> elements)
+        {
+            if (_initialValue == null)
+            {
+                return elements.Aggregate(_reduceFunction);
+            }
+
+            return elements.Aggregate(_initialValue, _reduceFunction);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
new file mode 100644
index 0000000..37b6ce7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// The specification used to define Reduce MPI Operators.
+    /// </summary>
+    public class ReduceOperatorSpec<T> : IOperatorSpec<T>
+    {
+        /// <summary>
+        /// Creates a new ReduceOperatorSpec.
+        /// </summary>
+        /// <param name="receiverId">The identifier of the task that
+        /// will receive and reduce incoming messages.</param>
+        /// <param name="codec">The codec used for serializing messages.</param>
+        /// <param name="reduceFunction">The class used to aggregate all messages.</param>
+        public ReduceOperatorSpec(
+            string receiverId, 
+            ICodec<T> codec, 
+            IReduceFunction<T> reduceFunction)
+        {
+            ReceiverId = receiverId;
+            Codec = codec;
+            ReduceFunction = reduceFunction;
+        }
+
+        /// <summary>
+        /// Returns the identifier for the task that receives and reduces
+        /// incoming messages.
+        /// </summary>
+        public string ReceiverId { get; private set; }
+
+        /// <summary>
+        /// The codec used to serialize and deserialize messages.
+        /// </summary>
+        public ICodec<T> Codec { get; private set; }
+
+        /// <summary>
+        /// The class used to aggregate incoming messages.
+        /// </summary>
+        public IReduceFunction<T> ReduceFunction { get; private set; } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
new file mode 100644
index 0000000..3c722ef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// MPI operator used to receive and reduce messages.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class ReduceReceiver<T> : IReduceReceiver<T>
+    {
+        private const int DefaultVersion = 1;
+
+        private ICommunicationGroupNetworkObserver _networkHandler;
+        private OperatorTopology<T> _topology;
+
+        /// <summary>
+        /// Creates a new ReduceReceiver.
+        /// </summary>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="groupName">The name of the operator's CommunicationGroup</param>
+        /// <param name="topology">The task's operator topology graph</param>
+        /// <param name="networkHandler">Handles incoming messages from other tasks</param>
+        /// <param name="reduceFunction">The class used to aggregate all incoming messages</param>
+        [Inject]
+        public ReduceReceiver(
+            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+            [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<T> topology, 
+            ICommunicationGroupNetworkObserver networkHandler,
+            IReduceFunction<T> reduceFunction)
+        {
+            OperatorName = operatorName;
+            GroupName = groupName;
+            Version = DefaultVersion;
+            ReduceFunction = reduceFunction;
+
+            _networkHandler = networkHandler;
+            _topology = topology;
+            _topology.Initialize();
+
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            _networkHandler.Register(operatorName, msgHandler);
+        }
+
+        /// <summary>
+        /// Returns the name of the reduce operator
+        /// </summary>
+        public string OperatorName { get; private set; }
+
+        /// <summary>
+        /// Returns the name of the operator's CommunicationGroup.
+        /// </summary>
+        public string GroupName { get; private set; }
+
+        /// <summary>
+        /// Returns the operator version.
+        /// </summary>
+        public int Version { get; private set; }
+
+        /// <summary>
+        /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+        /// </summary>
+        public IReduceFunction<T> ReduceFunction { get; private set; }
+
+        /// <summary>
+        /// Receives messages sent by all ReduceSenders and aggregates them
+        /// using the specified IReduceFunction.
+        /// </summary>
+        /// <returns>The single aggregated data</returns>
+        public T Reduce()
+        {
+            return _topology.ReceiveFromChildren(ReduceFunction);
+        }
+    }
+}