You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/11 23:58:08 UTC
[3/4] incubator-reef git commit: [REEF-150] Adding group
communication to REEF .Net
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
new file mode 100644
index 0000000..423e916
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+ public class MpiConfigurationOptions
+ {
+ [NamedParameter("Name of the communication group")]
+ public class CommunicationGroupName : Name<string>
+ {
+ }
+
+ [NamedParameter("Name of the MPI operator")]
+ public class OperatorName : Name<string>
+ {
+ }
+
+ [NamedParameter("Driver identifier")]
+ public class DriverId : Name<string>
+ {
+ }
+
+ [NamedParameter("Master task identifier")]
+ public class MasterTaskId : Name<string>
+ {
+ }
+
+ [NamedParameter("Serialized communication group configuration")]
+ public class SerializedGroupConfigs : Name<ISet<string>>
+ {
+ }
+
+ [NamedParameter("Serialized operator configuration")]
+ public class SerializedOperatorConfigs : Name<ISet<string>>
+ {
+ }
+
+ [NamedParameter("Id of root task in operator topology")]
+ public class TopologyRootTaskId : Name<string>
+ {
+ }
+
+ [NamedParameter("Ids of child tasks in operator topology")]
+ public class TopologyChildTaskIds : Name<ISet<string>>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
new file mode 100644
index 0000000..a2a249d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Tang.Interface;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Group.Driver
+{
+ /// <summary>
+ /// Used to configure MPI operators in Reef driver.
+ /// All operators in the same Communication Group run on the the
+ /// same set of tasks.
+ /// </summary>
+ public interface ICommunicationGroupDriver
+ {
+ /// <summary>
+ /// Returns the list of task ids that belong to this Communication Group
+ /// </summary>
+ List<string> TaskIds { get; }
+
+ /// <summary>
+ /// Adds the Broadcast MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the broadcast operator</param>
+ /// <param name="spec">The specification that defines the Broadcast operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
+ ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec);
+
+ /// <summary>
+ /// Adds the Reduce MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the reduce operator</param>
+ /// <param name="spec">The specification that defines the Reduce operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
+ ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec);
+
+ /// <summary>
+ /// Adds the Scatter MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the scatter operator</param>
+ /// <param name="spec">The specification that defines the Scatter operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
+ ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec);
+
+ /// <summary>
+ /// Finalizes the CommunicationGroupDriver.
+ /// After the CommunicationGroupDriver has been finalized, no more operators may
+ /// be added to the group.
+ /// </summary>
+ /// <returns>The same finalized CommunicationGroupDriver</returns>
+ ICommunicationGroupDriver Build();
+
+ /// <summary>
+ /// Add a task to the communication group.
+ /// The CommunicationGroupDriver must have called Build() before adding tasks to the group.
+ /// </summary>
+ /// <param name="taskId">The id of the task to add</param>
+ void AddTask(string taskId);
+
+ /// <summary>
+ /// Get the Task Configuration for this communication group.
+ /// Must be called only after all tasks have been added to the CommunicationGroupDriver.
+ /// </summary>
+ /// <param name="taskId">The task id of the task that belongs to this Communication Group</param>
+ /// <returns>The Task Configuration for this communication group</returns>
+ IConfiguration GetGroupTaskConfiguration(string taskId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
new file mode 100644
index 0000000..422d63e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Network.Group.Driver
+{
+ /// <summary>
+ /// Used to create Communication Groups for MPI Operators.
+ /// Also manages configuration for MPI tasks/services.
+ /// </summary>
+ public interface IMpiDriver
+ {
+ /// <summary>
+ /// Returns the identifier for the master task
+ /// </summary>
+ string MasterTaskId { get; }
+
+ /// <summary>
+ /// Create a new CommunicationGroup with the given name and number of tasks/operators.
+ /// </summary>
+ /// <param name="groupName">The new group name</param>
+ /// <param name="numTasks">The number of tasks/operators in the group.</param>
+ /// <returns>The new Communication Group</returns>
+ ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks);
+
+ /// <summary>
+ /// Generates context configuration with a unique identifier.
+ /// </summary>
+ /// <returns>The configured context configuration</returns>
+ IConfiguration GetContextConfiguration();
+
+ /// <summary>
+ /// Get the service configuration required for running MPI on Reef tasks.
+ /// </summary>
+ /// <returns>The service configuration for the Reef tasks</returns>
+ IConfiguration GetServiceConfiguration();
+
+ /// <summary>
+ /// Checks whether this active context can be used to run the Master Task.
+ /// </summary>
+ /// <param name="activeContext">The active context to check</param>
+ /// <returns>True if the active context can run the Master task,
+ /// otherwise false.</returns>
+ bool IsMasterTaskContext(IActiveContext activeContext);
+
+ /// <summary>
+ /// Checks whether this context configuration is used to configure the Master Task.
+ /// </summary>
+ /// <param name="contextConfiguration">The context configuration to check</param>
+ /// <returns>True if the context configuration is used to configure the Master
+ /// Task, otherwise false.</returns>
+ bool IsMasterContextConfiguration(IConfiguration contextConfiguration);
+
+ /// <summary>
+ /// Gets the context number associated with the Active Context id.
+ /// </summary>
+ /// <param name="activeContext">The active context to check</param>
+ /// <returns>The context number associated with the active context id</returns>
+ int GetContextNum(IActiveContext activeContext);
+
+ /// <summary>
+ /// Get the configuration for a particular task.
+ ///
+ /// The task may belong to many Communication Groups, so each one is serialized
+ /// in the configuration as a SerializedGroupConfig.
+ ///
+ /// The user must merge their part of task configuration (task id, task class)
+ /// with this returned MPI task configuration.
+ /// </summary>
+ /// <param name="taskId">The id of the task Configuration to generate</param>
+ /// <returns>The MPI task configuration with communication group and
+ /// operator configuration set.</returns>
+ IConfiguration GetMpiTaskConfiguration(string taskId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
new file mode 100644
index 0000000..d29c504
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Used to configure MPI operators in Reef driver.
+ /// All operators in the same Communication Group run on the the
+ /// same set of tasks.
+ /// </summary>
+ public class CommunicationGroupDriver : ICommunicationGroupDriver
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupDriver));
+
+ private string _groupName;
+ private string _driverId;
+ private int _numTasks;
+ private int _tasksAdded;
+ private bool _finalized;
+
+ private AvroConfigurationSerializer _confSerializer;
+
+ private object _topologyLock;
+ private Dictionary<string, object> _operatorSpecs;
+ private Dictionary<string, object> _topologies;
+
+ /// <summary>
+ /// Create a new CommunicationGroupDriver.
+ /// </summary>
+ /// <param name="groupName">The communication group name</param>
+ /// <param name="driverId">Identifier of the Reef driver</param>
+ /// <param name="numTasks">The number of tasks each operator will use</param>
+ /// <param name="confSerializer">Used to serialize task configuration</param>
+ public CommunicationGroupDriver(
+ string groupName,
+ string driverId,
+ int numTasks,
+ AvroConfigurationSerializer confSerializer)
+ {
+ _confSerializer = confSerializer;
+ _groupName = groupName;
+ _driverId = driverId;
+ _numTasks = numTasks;
+ _tasksAdded = 0;
+ _finalized = false;
+
+ _topologyLock = new object();
+
+ _operatorSpecs = new Dictionary<string, object>();
+ _topologies = new Dictionary<string, object>();
+ TaskIds = new List<string>();
+ }
+
+ /// <summary>
+ /// Returns the list of task ids that belong to this Communication Group
+ /// </summary>
+ public List<string> TaskIds { get; private set; }
+
+ /// <summary>
+ /// Adds the Broadcast MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the broadcast operator</param>
+ /// <param name="spec">The specification that defines the Broadcast operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns>
+ public ICommunicationGroupDriver AddBroadcast<T>(
+ string operatorName,
+ BroadcastOperatorSpec<T> spec)
+ {
+ if (_finalized)
+ {
+ throw new IllegalStateException("Can't add operators once the spec has been built.");
+ }
+
+ ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+ _topologies[operatorName] = topology;
+ _operatorSpecs[operatorName] = spec;
+
+ return this;
+ }
+
+ /// <summary>
+ /// Adds the Reduce MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the reduce operator</param>
+ /// <param name="spec">The specification that defines the Reduce operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns>
+ public ICommunicationGroupDriver AddReduce<T>(
+ string operatorName,
+ ReduceOperatorSpec<T> spec)
+ {
+ if (_finalized)
+ {
+ throw new IllegalStateException("Can't add operators once the spec has been built.");
+ }
+
+ ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
+ _topologies[operatorName] = topology;
+ _operatorSpecs[operatorName] = spec;
+
+ return this;
+ }
+
+ /// <summary>
+ /// Adds the Scatter MPI operator to the communication group.
+ /// </summary>
+ /// <typeparam name="T">The type of messages that operators will send</typeparam>
+ /// <param name="operatorName">The name of the scatter operator</param>
+ /// <param name="spec">The specification that defines the Scatter operator</param>
+ /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns>
+ public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec)
+ {
+ if (_finalized)
+ {
+ throw new IllegalStateException("Can't add operators once the spec has been built.");
+ }
+
+ ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+ _topologies[operatorName] = topology;
+ _operatorSpecs[operatorName] = spec;
+
+ return this;
+ }
+
+ /// <summary>
+ /// Finalizes the CommunicationGroupDriver.
+ /// After the CommunicationGroupDriver has been finalized, no more operators may
+ /// be added to the group.
+ /// </summary>
+ /// <returns>The same finalized CommunicationGroupDriver</returns>
+ public ICommunicationGroupDriver Build()
+ {
+ _finalized = true;
+ return this;
+ }
+
+ /// <summary>
+ /// Add a task to the communication group.
+ /// The CommunicationGroupDriver must have called Build() before adding tasks to the group.
+ /// </summary>
+ /// <param name="taskId">The id of the task to add</param>
+ public void AddTask(string taskId)
+ {
+ if (!_finalized)
+ {
+ throw new IllegalStateException("CommunicationGroupDriver must call Build() before adding tasks to the group.");
+ }
+
+ lock (_topologyLock)
+ {
+ _tasksAdded++;
+ if (_tasksAdded > _numTasks)
+ {
+ throw new IllegalStateException("Added too many tasks to Communication Group, expected: " + _numTasks);
+ }
+
+ TaskIds.Add(taskId);
+ foreach (string operatorName in _operatorSpecs.Keys)
+ {
+ AddTask(operatorName, taskId);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Get the Task Configuration for this communication group.
+ /// Must be called only after all tasks have been added to the CommunicationGroupDriver.
+ /// </summary>
+ /// <param name="taskId">The task id of the task that belongs to this Communication Group</param>
+ /// <returns>The Task Configuration for this communication group</returns>
+ public IConfiguration GetGroupTaskConfiguration(string taskId)
+ {
+ if (!TaskIds.Contains(taskId))
+ {
+ return null;
+ }
+
+ // Make sure all tasks have been added to communication group before generating config
+ lock (_topologyLock)
+ {
+ if (_tasksAdded != _numTasks)
+ {
+ throw new IllegalStateException(
+ "Must add all tasks to communication group before fetching configuration");
+ }
+ }
+
+ var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<MpiConfigurationOptions.DriverId, string>(
+ GenericType<MpiConfigurationOptions.DriverId>.Class,
+ _driverId)
+ .BindNamedParameter<MpiConfigurationOptions.CommunicationGroupName, string>(
+ GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class,
+ _groupName);
+
+ foreach (var operatorName in _topologies.Keys)
+ {
+ var innerConf = TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId))
+ .BindNamedParameter<MpiConfigurationOptions.DriverId, string>(
+ GenericType<MpiConfigurationOptions.DriverId>.Class,
+ _driverId)
+ .BindNamedParameter<MpiConfigurationOptions.OperatorName, string>(
+ GenericType<MpiConfigurationOptions.OperatorName>.Class,
+ operatorName)
+ .Build();
+
+ confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedOperatorConfigs, string>(
+ GenericType<MpiConfigurationOptions.SerializedOperatorConfigs>.Class,
+ _confSerializer.ToString(innerConf));
+ }
+
+ return confBuilder.Build();
+ }
+
+ private void AddTask(string operatorName, string taskId)
+ {
+ var topology = _topologies[operatorName];
+ MethodInfo info = topology.GetType().GetMethod("AddTask");
+ info.Invoke(topology, new[] { (object) taskId });
+ }
+
+ private IConfiguration GetOperatorConfiguration(string operatorName, string taskId)
+ {
+ var topology = _topologies[operatorName];
+ MethodInfo info = topology.GetType().GetMethod("GetTaskConfiguration");
+ return (IConfiguration) info.Invoke(topology, new[] { (object) taskId });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
new file mode 100644
index 0000000..1439a36
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Messages sent by MPI Operators
+ /// </summary>
+ public class GroupCommunicationMessage
+ {
+ /// <summary>
+ /// Create new CommunicationGroupMessage.
+ /// </summary>
+ /// <param name="groupName">The name of the communication group</param>
+ /// <param name="operatorName">The name of the MPI operator</param>
+ /// <param name="source">The message source</param>
+ /// <param name="destination">The message destination</param>
+ /// <param name="data">The actual byte array of data</param>
+ /// <param name="messageType">The type of message to send</param>
+ public GroupCommunicationMessage(
+ string groupName,
+ string operatorName,
+ string source,
+ string destination,
+ byte[] data,
+ MessageType messageType)
+ {
+ GroupName = groupName;
+ OperatorName = operatorName;
+ Source = source;
+ Destination = destination;
+ Data = new[] { data };
+ MsgType = messageType;
+ }
+
+ /// <summary>
+ /// Create new CommunicationGroupMessage.
+ /// </summary>
+ /// <param name="groupName">The name of the communication group</param>
+ /// <param name="operatorName">The name of the MPI operator</param>
+ /// <param name="source">The message source</param>
+ /// <param name="destination">The message destination</param>
+ /// <param name="data">The actual byte array of data</param>
+ /// <param name="messageType">The type of message to send</param>
+ public GroupCommunicationMessage(
+ string groupName,
+ string operatorName,
+ string source,
+ string destination,
+ byte[][] data,
+ MessageType messageType)
+ {
+ GroupName = groupName;
+ OperatorName = operatorName;
+ Source = source;
+ Destination = destination;
+ Data = data;
+ MsgType = messageType;
+ }
+
+ /// <summary>
+ /// Returns the Communication Group name.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the MPI Operator name.
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the source of the message.
+ /// </summary>
+ public string Source { get; private set; }
+
+ /// <summary>
+ /// Returns the destination of the message.
+ /// </summary>
+ public string Destination { get; private set; }
+
+ /// <summary>
+ /// Returns the message data.
+ /// </summary>
+ public byte[][] Data { get; private set; }
+
+ /// <summary>
+ /// Returns the type of message being sent.
+ /// </summary>
+ public MessageType MsgType { get; private set; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
new file mode 100644
index 0000000..cd8ace2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Represents the different types of messages that Mpi Tasks can
+ /// send to each other.
+ /// </summary>
+ public enum MessageType
+ {
+ Data = 0
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
new file mode 100644
index 0000000..a373ef3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Net;
+using System.Threading;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Network.Group.Codec;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Used to create Communication Groups for MPI Operators on the Reef driver.
+ /// Also manages configuration for MPI tasks/services.
+ /// </summary>
+ public class MpiDriver : IMpiDriver
+ {
+ private const string MasterTaskContextName = "MasterTaskContext";
+ private const string SlaveTaskContextName = "SlaveTaskContext";
+
+ private static Logger LOGGER = Logger.GetLogger(typeof(MpiDriver));
+
+ private string _driverId;
+ private string _nameServerAddr;
+ private int _nameServerPort;
+ private int _contextIds;
+
+ private Dictionary<string, ICommunicationGroupDriver> _commGroups;
+ private AvroConfigurationSerializer _configSerializer;
+ private NameServer _nameServer;
+
+ /// <summary>
+ /// Create a new MpiDriver object.
+ /// </summary>
+ /// <param name="driverId">Identifer for the REEF driver</param>
+ /// <param name="masterTaskId">Identifer for MPI master task</param>
+ /// <param name="configSerializer">Used to serialize task configuration</param>
+ [Inject]
+ public MpiDriver(
+ [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+ [Parameter(typeof(MpiConfigurationOptions.MasterTaskId))] string masterTaskId,
+ AvroConfigurationSerializer configSerializer)
+ {
+ _driverId = driverId;
+ _contextIds = -1;
+ MasterTaskId = masterTaskId;
+
+ _configSerializer = configSerializer;
+ _commGroups = new Dictionary<string, ICommunicationGroupDriver>();
+ _nameServer = new NameServer(0);
+
+ IPEndPoint localEndpoint = _nameServer.LocalEndpoint;
+ _nameServerAddr = localEndpoint.Address.ToString();
+ _nameServerPort = localEndpoint.Port;
+ }
+
+ /// <summary>
+ /// Returns the identifier for the master task
+ /// </summary>
+ public string MasterTaskId { get; private set; }
+
+ /// <summary>
+ /// Create a new CommunicationGroup with the given name and number of tasks/operators.
+ /// </summary>
+ /// <param name="groupName">The new group name</param>
+ /// <param name="numTasks">The number of tasks/operators in the group.</param>
+ /// <returns>The new Communication Group</returns>
+ public ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks)
+ {
+ if (string.IsNullOrEmpty(groupName))
+ {
+ throw new ArgumentNullException("groupName");
+ }
+ else if (numTasks < 1)
+ {
+ throw new ArgumentException("NumTasks must be greater than 0");
+ }
+ else if (_commGroups.ContainsKey(groupName))
+ {
+ throw new ArgumentException("Group Name already registered with MpiDriver");
+ }
+
+ var commGroup = new CommunicationGroupDriver(groupName, _driverId, numTasks, _configSerializer);
+ _commGroups[groupName] = commGroup;
+ return commGroup;
+ }
+
+ /// <summary>
+ /// Generates context configuration with a unique identifier.
+ /// </summary>
+ /// <returns>The configured context configuration</returns>
+ public IConfiguration GetContextConfiguration()
+ {
+ int contextNum = Interlocked.Increment(ref _contextIds);
+ string id = (contextNum == 0)
+ ? MasterTaskContextName
+ : GetSlaveTaskContextName(contextNum);
+
+ return ContextConfiguration.ConfigurationModule
+ .Set(ContextConfiguration.Identifier, id)
+ .Build();
+ }
+
+ /// <summary>
+ /// Get the service configuration required for running MPI on Reef tasks.
+ /// </summary>
+ /// <returns>The service configuration for the Reef tasks</returns>
+ public IConfiguration GetServiceConfiguration()
+ {
+ IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule
+ .Set(ServiceConfiguration.Services, GenericType<NetworkService<GroupCommunicationMessage>>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
+ .BindImplementation(
+ GenericType<IObserver<NsMessage<GroupCommunicationMessage>>>.Class,
+ GenericType<MpiNetworkObserver>.Class)
+ .BindImplementation(
+ GenericType<ICodec<GroupCommunicationMessage>>.Class,
+ GenericType<GroupCommunicationMessageCodec>.Class)
+ .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+ GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+ _nameServerAddr)
+ .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+ GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+ _nameServerPort.ToString(CultureInfo.InvariantCulture))
+ .Build();
+ }
+
+ /// <summary>
+ /// Get the configuration for a particular task.
+ ///
+ /// The task may belong to many Communication Groups, so each one is serialized
+ /// in the configuration as a SerializedGroupConfig.
+ ///
+ /// The user must merge their part of task configuration (task id, task class)
+ /// with this returned MPI task configuration.
+ /// </summary>
+ /// <param name="taskId">The id of the task Configuration to generate</param>
+ /// <returns>The MPI task configuration with communication group and
+ /// operator configuration set.</returns>
+ public IConfiguration GetMpiTaskConfiguration(string taskId)
+ {
+ var confBuilder = TangFactory.GetTang().NewConfigurationBuilder();
+
+ foreach (ICommunicationGroupDriver commGroup in _commGroups.Values)
+ {
+ var taskConf = commGroup.GetGroupTaskConfiguration(taskId);
+ if (taskConf != null)
+ {
+ confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedGroupConfigs, string>(
+ GenericType<MpiConfigurationOptions.SerializedGroupConfigs>.Class,
+ _configSerializer.ToString(taskConf));
+ }
+ }
+
+ return confBuilder.Build();
+ }
+
+ /// <summary>
+ /// Checks whether this active context can be used to run the Master Task.
+ /// </summary>
+ /// <param name="activeContext">The active context to check</param>
+ /// <returns>True if the active context can run the Master task,
+ /// otherwise false.</returns>
+ public bool IsMasterTaskContext(IActiveContext activeContext)
+ {
+ return activeContext.Id.Equals(MasterTaskContextName);
+ }
+
+ /// <summary>
+ /// Checks whether this context configuration is used to configure the Master Task.
+ /// </summary>
+ /// <param name="contextConfiguration">The context configuration to check</param>
+ /// <returns>True if the context configuration is used to configure the Master
+ /// Task, otherwise false.</returns>
+ public bool IsMasterContextConfiguration(IConfiguration contextConfiguration)
+ {
+ return Utilities.Utils.GetContextId(contextConfiguration).Equals(MasterTaskContextName);
+ }
+
+ /// <summary>
+ /// Gets the context number associated with the Active Context id.
+ /// </summary>
+ /// <param name="activeContext">The active context to check</param>
+ /// <returns>The context number associated with the active context id</returns>
+ public int GetContextNum(IActiveContext activeContext)
+ {
+ if (activeContext.Id.Equals(MasterTaskContextName))
+ {
+ return 0;
+ }
+
+ string[] parts = activeContext.Id.Split('-');
+ if (parts.Length != 2)
+ {
+ throw new ArgumentException("Invalid id in active context");
+ }
+
+ return int.Parse(parts[1], CultureInfo.InvariantCulture);
+ }
+
+ private string GetSlaveTaskContextName(int contextNum)
+ {
+ return string.Format(CultureInfo.InvariantCulture, "{0}-{1}", SlaveTaskContextName, contextNum);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
new file mode 100644
index 0000000..48463dc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Helper class to start MPI tasks.
+ /// </summary>
+ public class TaskStarter
+ {
+ private static Logger LOGGER = Logger.GetLogger(typeof(TaskStarter));
+
+ private object _lock;
+ private int _numTasks;
+ private int _tasksAdded;
+ private string _masterTaskId;
+
+ private IMpiDriver _mpiDriver;
+ private List<Tuple<string, IConfiguration, IActiveContext>> _taskTuples;
+
+ /// <summary>
+ /// Create new TaskStarter.
+ /// After adding the correct number of tasks to the TaskStarter, the
+ /// Tasks will be started on their given active context.
+ /// </summary>
+ /// <param name="mpiDriver">The IMpiDriver for the MPI tasks</param>
+ /// <param name="numTasks">The number of Tasks that need to be added before
+ /// the Tasks will be started. </param>
+ public TaskStarter(IMpiDriver mpiDriver, int numTasks)
+ {
+ LOGGER.Log(Level.Verbose, "Creating TaskStarter");
+ _masterTaskId = mpiDriver.MasterTaskId;
+ _numTasks = numTasks;
+ _tasksAdded = 0;
+ _lock = new object();
+
+ _mpiDriver = mpiDriver;
+ _taskTuples = new List<Tuple<string, IConfiguration, IActiveContext>>();
+ }
+
+ /// <summary>
+ /// Queues the task into the TaskStarter.
+ ///
+ /// Once the correct number of tasks have been queued, the final Configuration
+ /// will be generated and run on the given Active Context.
+ /// </summary>
+ /// <param name="partialTaskConfig">The partial task configuration containing Task
+ /// identifier and Task class</param>
+ /// <param name="activeContext">The Active Context to run the Task on</param>
+ public void QueueTask(IConfiguration partialTaskConfig, IActiveContext activeContext)
+ {
+ string taskId = Utils.GetTaskId(partialTaskConfig);
+ LOGGER.Log(Level.Verbose, "Adding context with identifier: " + taskId);
+
+ lock (_lock)
+ {
+ _taskTuples.Add(
+ new Tuple<string, IConfiguration, IActiveContext>(taskId, partialTaskConfig, activeContext));
+
+ if (Interlocked.Increment(ref _tasksAdded) == _numTasks)
+ {
+ StartTasks();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Starts the Master Task followed by the Slave Tasks.
+ /// </summary>
+ private void StartTasks()
+ {
+ Tuple<string, IConfiguration, IActiveContext> masterTaskTuple;
+ try
+ {
+ masterTaskTuple = _taskTuples.Single(tuple => tuple.Item1.Equals(_masterTaskId));
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Error, "There must be exactly one master task. The driver has been misconfigured.");
+ throw;
+ }
+
+ LOGGER.Log(Level.Verbose, "Starting master task on context id: {0}.", masterTaskTuple.Item3.Id);
+ StartTask(masterTaskTuple.Item1, masterTaskTuple.Item2, masterTaskTuple.Item3);
+
+ LOGGER.Log(Level.Verbose, "Starting slave tasks.");
+ foreach (Tuple<string, IConfiguration, IActiveContext> taskTuple in _taskTuples)
+ {
+ string taskId = taskTuple.Item1;
+ if (taskId.Equals(_masterTaskId))
+ {
+ continue;
+ }
+
+ StartTask(taskId, taskTuple.Item2, taskTuple.Item3);
+ }
+ }
+
+ private void StartTask(
+ string taskId,
+ IConfiguration userPartialTaskConf,
+ IActiveContext activeContext)
+ {
+ IConfiguration mpiTaskConfiguration = _mpiDriver.GetMpiTaskConfiguration(taskId);
+ IConfiguration mergedTaskConf = Configurations.Merge(userPartialTaskConf, mpiTaskConfiguration);
+ activeContext.SubmitTask(mergedTaskConf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
new file mode 100644
index 0000000..09c8c30
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI Operator used to receive broadcast messages.
+ /// </summary>
+ /// <typeparam name="T">The type of message being sent.</typeparam>
+ public interface IBroadcastReceiver<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Receive a message from parent BroadcastSender.
+ /// </summary>
+ /// <returns>The incoming message</returns>
+ T Receive();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
new file mode 100644
index 0000000..534fa9f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI Operator used to send messages to child Tasks.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IBroadcastSender<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Send the data to all BroadcastReceivers.
+ /// </summary>
+ /// <param name="data">The data to send.</param>
+ void Send(T data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
new file mode 100644
index 0000000..661f348
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// An Mpi Operator to be used in a Reef Task.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IMpiOperator<T>
+ {
+ /// <summary>
+ /// The operator name.
+ /// </summary>
+ string OperatorName { get; }
+
+ /// <summary>
+ /// The name of the operator's CommunicationGroup.
+ /// </summary>
+ string GroupName { get; }
+
+ /// <summary>
+ /// The operator version number.
+ /// </summary>
+ int Version { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
new file mode 100644
index 0000000..5d18677
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// The specification used to define Broadcast Operators.
+ /// </summary>
+ public interface IOperatorSpec<T>
+ {
+ /// <summary>
+ /// Returns the codec used to serialize and deserialize messages.
+ /// </summary>
+ ICodec<T> Codec { get; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
new file mode 100644
index 0000000..020b09a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// The class used to aggregate messages sent by ReduceSenders.
+ /// </summary>
+ /// <typeparam name="T">The message type.</typeparam>
+ public interface IReduceFunction<T>
+ {
+ /// <summary>
+ /// Reduce the IEnumerable of messages into one message.
+ /// </summary>
+ /// <param name="elements">The messages to reduce</param>
+ /// <returns>The reduced message</returns>
+ T Reduce(IEnumerable<T> elements);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
new file mode 100644
index 0000000..2305968
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI operator used to receive and reduce messages.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IReduceReceiver<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+ /// </summary>
+ IReduceFunction<T> ReduceFunction { get; }
+
+ /// <summary>
+ /// Receives messages sent by all ReduceSenders and aggregates them
+ /// using the specified IReduceFunction.
+ /// </summary>
+ /// <returns>The single aggregated data</returns>
+ T Reduce();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
new file mode 100644
index 0000000..c15ded6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI Operator used to send messages to be reduced by the ReduceReceiver.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IReduceSender<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Sends data to the operator's ReduceReceiver to be aggregated.
+ /// </summary>
+ /// <param name="data">The data to send</param>
+ void Send(T data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
new file mode 100644
index 0000000..7aa4e81
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI operator used to receive a sublist of messages sent
+ /// from the IScatterSender.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IScatterReceiver<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Receive a sublist of messages sent from the IScatterSender.
+ /// </summary>
+ /// <returns>The sublist of messages</returns>
+ List<T> Receive();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
new file mode 100644
index 0000000..c5a2c3d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Operators
+{
+ /// <summary>
+ /// MPI operator used to scatter a list of elements to all
+ /// of the IScatterReceivers.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface IScatterSender<T> : IMpiOperator<T>
+ {
+ /// <summary>
+ /// Split up the list of elements evenly and scatter each chunk
+ /// to the IScatterReceivers.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ void Send(List<T> elements);
+
+ /// <summary>
+ /// Split up the list of elements and scatter each chunk
+ /// to the IScatterReceivers. Each receiver will receive
+ /// a sublist of the specified size.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ /// <param name="count">The size of each sublist</param>
+ void Send(List<T> elements, int count);
+
+ /// <summary>
+ /// Split up the list of elements and scatter each chunk
+ /// to the IScatterReceivers in the specified task order.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ /// <param name="order">The list of task identifiers representing
+ /// the order in which to scatter each sublist</param>
+ void Send(List<T> elements, List<string> order);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
new file mode 100644
index 0000000..904e4ef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// The specification used to define Broadcast Operators.
+ /// </summary>
+ public class BroadcastOperatorSpec<T> : IOperatorSpec<T>
+ {
+ /// <summary>
+ /// Create a new BroadcastOperatorSpec.
+ /// </summary>
+ /// <param name="senderId">The identifier of the root sending Task.</param>
+ /// <param name="codecType">The codec used to serialize messages.</param>
+ public BroadcastOperatorSpec(string senderId, ICodec<T> codecType)
+ {
+ SenderId = senderId;
+ Codec = codecType;
+ }
+
+ /// <summary>
+ /// Returns the identifier of the Task that will broadcast data to other Tasks.
+ /// </summary>
+ public string SenderId { get; private set; }
+
+ /// <summary>
+ /// Returns the ICodec used to serialize messages.
+ /// </summary>
+ public ICodec<T> Codec { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
new file mode 100644
index 0000000..4374ab5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI Operator used to receive broadcast messages.
+ /// </summary>
+ /// <typeparam name="T">The type of message being sent.</typeparam>
+ public class BroadcastReceiver<T> : IBroadcastReceiver<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new BroadcastReceiver.
+ /// </summary>
+ /// <param name="operatorName">The operator identifier</param>
+ /// <param name="groupName">The name of the CommunicationGroup that the
+ /// operator belongs to</param>
+ /// <param name="topology">The node's topology graph</param>
+ /// <param name="networkHandler">The incoming message handler</param>
+ [Inject]
+ public BroadcastReceiver(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ /// <summary>
+ /// Returns the operator identifier.
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the name of the CommunicationGroup that the operator belongs to.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the operator version.
+ /// </summary>
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Receive a message from parent BroadcastSender.
+ /// </summary>
+ /// <returns>The incoming message</returns>
+ public T Receive()
+ {
+ return _topology.ReceiveFromParent();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
new file mode 100644
index 0000000..4e48428
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI Operator used to send messages to child Tasks.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class BroadcastSender<T> : IBroadcastSender<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new BroadcastSender to send messages to other Tasks.
+ /// </summary>
+ /// <param name="operatorName">The identifier for the operator</param>
+ /// <param name="groupName">The name of the CommunicationGroup that the operator
+ /// belongs to</param>
+ /// <param name="topology">The node's topology graph</param>
+ /// <param name="networkHandler">The incoming message handler</param>
+ [Inject]
+ public BroadcastSender(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ /// <summary>
+ /// Returns the identifier for the MPI operator.
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the name of the operator's CommunicationGroup.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the operator version.
+ /// </summary>
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Send the data to all BroadcastReceivers.
+ /// </summary>
+ /// <param name="data">The data to send.</param>
+ public void Send(T data)
+ {
+ if (data == null)
+ {
+ throw new ArgumentNullException("data");
+ }
+
+ _topology.SendToChildren(data, MessageType.Data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
new file mode 100644
index 0000000..bc60055
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ public class ReduceFunction<T> : IReduceFunction<T>
+ {
+ private Func<T, T, T> _reduceFunction;
+ private T _initialValue;
+
+ private ReduceFunction(Func<T, T, T> reduceFunction)
+ {
+ _reduceFunction = reduceFunction;
+ }
+
+ private ReduceFunction(Func<T, T, T> reduceFunction, T initialValue)
+ {
+ _reduceFunction = reduceFunction;
+ _initialValue = initialValue;
+ }
+
+ public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction)
+ {
+ return new ReduceFunction<T>(reduceFunction);
+ }
+
+ public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction, T initialValue)
+ {
+ return new ReduceFunction<T>(reduceFunction, initialValue);
+ }
+
+ public T Reduce(IEnumerable<T> elements)
+ {
+ if (_initialValue == null)
+ {
+ return elements.Aggregate(_reduceFunction);
+ }
+
+ return elements.Aggregate(_initialValue, _reduceFunction);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
new file mode 100644
index 0000000..37b6ce7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// The specification used to define Reduce MPI Operators.
+ /// </summary>
+ public class ReduceOperatorSpec<T> : IOperatorSpec<T>
+ {
+ /// <summary>
+ /// Creates a new ReduceOperatorSpec.
+ /// </summary>
+ /// <param name="receiverId">The identifier of the task that
+ /// will receive and reduce incoming messages.</param>
+ /// <param name="codec">The codec used for serializing messages.</param>
+ /// <param name="reduceFunction">The class used to aggregate all messages.</param>
+ public ReduceOperatorSpec(
+ string receiverId,
+ ICodec<T> codec,
+ IReduceFunction<T> reduceFunction)
+ {
+ ReceiverId = receiverId;
+ Codec = codec;
+ ReduceFunction = reduceFunction;
+ }
+
+ /// <summary>
+ /// Returns the identifier for the task that receives and reduces
+ /// incoming messages.
+ /// </summary>
+ public string ReceiverId { get; private set; }
+
+ /// <summary>
+ /// The codec used to serialize and deserialize messages.
+ /// </summary>
+ public ICodec<T> Codec { get; private set; }
+
+ /// <summary>
+ /// The class used to aggregate incoming messages.
+ /// </summary>
+ public IReduceFunction<T> ReduceFunction { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
new file mode 100644
index 0000000..3c722ef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI operator used to receive and reduce messages.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class ReduceReceiver<T> : IReduceReceiver<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new ReduceReceiver.
+ /// </summary>
+ /// <param name="operatorName">The name of the reduce operator</param>
+ /// <param name="groupName">The name of the operator's CommunicationGroup</param>
+ /// <param name="topology">The task's operator topology graph</param>
+ /// <param name="networkHandler">Handles incoming messages from other tasks</param>
+ /// <param name="reduceFunction">The class used to aggregate all incoming messages</param>
+ [Inject]
+ public ReduceReceiver(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler,
+ IReduceFunction<T> reduceFunction)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+ ReduceFunction = reduceFunction;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ /// <summary>
+ /// Returns the name of the reduce operator
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the name of the operator's CommunicationGroup.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the operator version.
+ /// </summary>
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+ /// </summary>
+ public IReduceFunction<T> ReduceFunction { get; private set; }
+
+ /// <summary>
+ /// Receives messages sent by all ReduceSenders and aggregates them
+ /// using the specified IReduceFunction.
+ /// </summary>
+ /// <returns>The single aggregated data</returns>
+ public T Reduce()
+ {
+ return _topology.ReceiveFromChildren(ReduceFunction);
+ }
+ }
+}