You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/11 23:58:07 UTC
[2/4] incubator-reef git commit: [REEF-150] Adding group
communication to REEF .Net
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
new file mode 100644
index 0000000..ffc5165
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Reactive;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI Operator used to send messages to be reduced by the ReduceReceiver.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class ReduceSender<T> : IReduceSender<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new ReduceSender.
+ /// </summary>
+ /// <param name="operatorName">The name of the reduce operator</param>
+ /// <param name="groupName">The name of the reduce operator's CommunicationGroup</param>
+ /// <param name="topology">The Task's operator topology graph</param>
+ /// <param name="networkHandler">The handler used to handle incoming messages</param>
+ [Inject]
+ public ReduceSender(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ /// <summary>
+ /// Returns the name of the reduce operator.
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the name of the operator's CommunicationGroup.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the operator version.
+ /// </summary>
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Sends the data to the operator's ReduceReceiver to be aggregated.
+ /// </summary>
+ /// <param name="data">The data to send</param>
+ public void Send(T data)
+ {
+ if (data == null)
+ {
+ throw new ArgumentNullException("data");
+ }
+
+ _topology.SendToParent(data, MessageType.Data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
new file mode 100644
index 0000000..8219eb6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// The specification used to define Scatter MPI Operators.
+ /// </summary>
+ public class ScatterOperatorSpec<T> : IOperatorSpec<T>
+ {
+ /// <summary>
+ /// Creates a new ScatterOperatorSpec.
+ /// </summary>
+ /// <param name="senderId">The identifier of the task that will
+ /// be sending messages</param>
+ /// <param name="codec">The codec used to serialize and
+ /// deserialize messages</param>
+ public ScatterOperatorSpec(string senderId, ICodec<T> codec)
+ {
+ SenderId = senderId;
+ Codec = codec;
+ }
+
+ /// <summary>
+ /// Returns the identifier for the task that splits and scatters a list
+ /// of messages to other tasks.
+ /// </summary>
+ public string SenderId { get; private set; }
+
+ /// <summary>
+ /// The codec used to serialize and deserialize messages.
+ /// </summary>
+ public ICodec<T> Codec { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
new file mode 100644
index 0000000..85b5c13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI operator used to receive a sublist of messages sent
+ /// from the IScatterSender.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class ScatterReceiver<T> : IScatterReceiver<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new ScatterReceiver.
+ /// </summary>
+ /// <param name="operatorName">The name of the scatter operator</param>
+ /// <param name="groupName">The name of the operator's CommunicationGroup</param>
+ /// <param name="topology">The task's operator topology graph</param>
+ /// <param name="networkHandler">Handles incoming messages from other tasks</param>
+ [Inject]
+ public ScatterReceiver(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ /// <summary>
+ /// Returns the name of the reduce operator
+ /// </summary>
+ public string OperatorName { get; private set; }
+
+ /// <summary>
+ /// Returns the name of the operator's CommunicationGroup.
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Returns the operator version.
+ /// </summary>
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Returns the class used to reduce incoming messages sent by ReduceSenders.
+ /// </summary>
+ public IReduceFunction<T> ReduceFunction { get; private set; }
+
+ /// <summary>
+ /// Receive a sublist of messages sent from the IScatterSender.
+ /// </summary>
+ /// <returns>The sublist of messages</returns>
+ public List<T> Receive()
+ {
+ return _topology.ReceiveListFromParent();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
new file mode 100644
index 0000000..ee9e683
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI operator used to scatter a list of elements to all
+ /// of the IScatterReceivers.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class ScatterSender<T> : IScatterSender<T>
+ {
+ private const int DefaultVersion = 1;
+
+ private ICommunicationGroupNetworkObserver _networkHandler;
+ private OperatorTopology<T> _topology;
+
+ /// <summary>
+ /// Creates a new ScatterSender.
+ /// </summary>
+ /// <param name="operatorName">The name of the scatter operator</param>
+ /// <param name="groupName">The name of the operator's Communication Group</param>
+ /// <param name="topology">The operator topology</param>
+ /// <param name="networkHandler">The network handler</param>
+ [Inject]
+ public ScatterSender(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ OperatorTopology<T> topology,
+ ICommunicationGroupNetworkObserver networkHandler)
+ {
+ OperatorName = operatorName;
+ GroupName = groupName;
+ Version = DefaultVersion;
+
+ _networkHandler = networkHandler;
+ _topology = topology;
+ _topology.Initialize();
+
+ var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+ _networkHandler.Register(operatorName, msgHandler);
+ }
+
+ public string OperatorName { get; private set; }
+
+ public string GroupName { get; private set; }
+
+ public int Version { get; private set; }
+
+ /// <summary>
+ /// Split up the list of elements evenly and scatter each chunk
+ /// to the IScatterReceivers.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ public void Send(List<T> elements)
+ {
+ _topology.ScatterToChildren(elements, MessageType.Data);
+ }
+
+ /// <summary>
+ /// Split up the list of elements and scatter each chunk
+ /// to the IScatterReceivers. Each receiver will receive
+ /// a sublist of the specified size.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ /// <param name="count">The size of each sublist</param>
+ public void Send(List<T> elements, int count)
+ {
+ _topology.ScatterToChildren(elements, count, MessageType.Data);
+ }
+
+ /// <summary>
+ /// Split up the list of elements and scatter each chunk
+ /// to the IScatterReceivers in the specified task order.
+ /// </summary>
+ /// <param name="elements">The list of elements to send.</param>
+ /// <param name="order">The list of task identifiers representing
+ /// the order in which to scatter each sublist</param>
+ public void Send(List<T> elements, List<string> order)
+ {
+ _topology.ScatterToChildren(elements, order, MessageType.Data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
new file mode 100644
index 0000000..c5ca60f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+ /// <summary>
+ /// MPI operator used to do point-to-point communication between named Tasks.
+ /// </summary>
+ public class Sender
+ {
+ private INetworkService<GroupCommunicationMessage> _networkService;
+ private IIdentifierFactory _idFactory;
+
+ /// <summary>
+ /// Creates a new Sender.
+ /// </summary>
+ /// <param name="networkService">The network services used to send messages.</param>
+ /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param>
+ [Inject]
+ public Sender(
+ NetworkService<GroupCommunicationMessage> networkService,
+ IIdentifierFactory idFactory)
+ {
+ _networkService = networkService;
+ _idFactory = idFactory;
+ }
+
+ /// <summary>
+ /// Send the GroupCommunicationMessage to the Task whose name is
+ /// included in the message.
+ /// </summary>
+ /// <param name="message">The message to send.</param>
+ public void Send(GroupCommunicationMessage message)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+ if (string.IsNullOrEmpty(message.Destination))
+ {
+ throw new ArgumentException("Message destination cannot be null or empty");
+ }
+
+ IIdentifier destId = _idFactory.Create(message.Destination);
+ var conn = _networkService.NewConnection(destId);
+ conn.Open();
+ conn.Write(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
new file mode 100644
index 0000000..9b96a9a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+ /// <summary>
+ /// Used by Tasks to fetch MPI Operators in the group configured by the driver.
+ /// </summary>
+ [DefaultImplementation(typeof(CommunicationGroupClient))]
+ public interface ICommunicationGroupClient
+ {
+ /// <summary>
+ /// Returns the Communication Group name
+ /// </summary>
+ string GroupName { get; }
+
+ /// <summary>
+ /// Gets the BroadcastSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Broadcast operator</param>
+ /// <returns>The BroadcastSender</returns>
+ IBroadcastSender<T> GetBroadcastSender<T>(string operatorName);
+
+ /// <summary>
+ /// Gets the BroadcastReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Broadcast operator</param>
+ /// <returns>The BroadcastReceiver</returns>
+ IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName);
+
+ /// <summary>
+ /// Gets the ReduceSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Reduce operator</param>
+ /// <returns>The ReduceSender</returns>
+ IReduceSender<T> GetReduceSender<T>(string operatorName);
+
+ /// <summary>
+ /// Gets the ReduceReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Reduce operator</param>
+ /// <returns>The ReduceReceiver</returns>
+ IReduceReceiver<T> GetReduceReceiver<T>(string operatorName);
+
+ /// <summary>
+ /// Gets the ScatterSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Scatter operator</param>
+ /// <returns>The ScatterSender</returns>
+ IScatterSender<T> GetScatterSender<T>(string operatorName);
+
+ /// <summary>
+ /// Gets the ScatterReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Scatter operator</param>
+ /// <returns>The ScatterReceiver</returns>
+ IScatterReceiver<T> GetScatterReceiver<T>(string operatorName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
new file mode 100644
index 0000000..d3034fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+ /// <summary>
+ /// Handles incoming messages sent to this Communication Group.
+ /// </summary>
+ [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
+ public interface ICommunicationGroupNetworkObserver : IObserver<GroupCommunicationMessage>
+ {
+ /// <summary>
+ /// Registers the handler with the CommunicationGroupNetworkObserver.
+ /// Messages that are to be sent to the operator specified by operatorName
+ /// are handled by the given observer.
+ /// </summary>
+ /// <param name="operatorName">The name of the operator whose handler
+ /// will be invoked</param>
+ /// <param name="observer">The handler to invoke when messages are sent
+ /// to the operator specified by operatorName</param>
+ void Register(string operatorName, IObserver<GroupCommunicationMessage> observer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
new file mode 100644
index 0000000..2592b04
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+ /// <summary>
+ /// Used by Tasks to fetch CommunicationGroupClients.
+ /// </summary>
+ [DefaultImplementation(typeof(MpiClient))]
+ public interface IMpiClient : IDisposable
+ {
+ /// <summary>
+ /// Gets the CommunicationGroupClient with the given group name.
+ /// </summary>
+ /// <param name="groupName">The name of the CommunicationGroupClient</param>
+ /// <returns>The configured CommunicationGroupClient</returns>
+ ICommunicationGroupClient GetCommunicationGroup(string groupName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
new file mode 100644
index 0000000..5fe948c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Task.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Codec;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+ /// <summary>
+ /// Handles all incoming messages for this Task.
+ /// </summary>
+ [DefaultImplementation(typeof(MpiNetworkObserver))]
+ public interface IMpiNetworkObserver : IObserver<NsMessage<GroupCommunicationMessage>>
+ {
+ /// <summary>
+ /// Registers the network handler for the given CommunicationGroup.
+ /// When messages are sent to the specified group name, the given handler
+ /// will be invoked with that message.
+ /// </summary>
+ /// <param name="groupName">The group name for the network handler</param>
+ /// <param name="commGroupHandler">The network handler to invoke when
+ /// messages are sent to the given group.</param>
+ void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
new file mode 100644
index 0000000..e6d653d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Used by Tasks to fetch MPI Operators in the group configured by the driver.
+ /// </summary>
+ public class CommunicationGroupClient : ICommunicationGroupClient
+ {
+ private readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupClient));
+
+ private string _taskId;
+ private string _driverId;
+
+ private Dictionary<string, IInjector> _operatorInjectors;
+ private Dictionary<string, object> _operators;
+ private NetworkService<GroupCommunicationMessage> _networkService;
+ private IMpiNetworkObserver _mpiNetworkHandler;
+ private ICommunicationGroupNetworkObserver _commGroupNetworkHandler;
+
+ /// <summary>
+ /// Creates a new CommunicationGroupClient.
+ /// </summary>
+ /// <param name="taskId">The identifier for this Task.</param>
+ /// <param name="groupName">The name of the CommunicationGroup</param>
+ /// <param name="driverId">The identifier for the driver</param>
+ /// <param name="operatorConfigs">The serialized operator configurations</param>
+ /// <param name="mpiNetworkObserver">The handler for all incoming messages
+ /// across all Communication Groups</param>
+ /// <param name="networkService">The network service used to send messages.</param>
+ /// <param name="configSerializer">Used to deserialize operator configuration.</param>
+ [Inject]
+ public CommunicationGroupClient(
+ [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+ [Parameter(typeof(MpiConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs,
+ IMpiNetworkObserver mpiNetworkObserver,
+ NetworkService<GroupCommunicationMessage> networkService,
+ AvroConfigurationSerializer configSerializer)
+ {
+ _taskId = taskId;
+ _driverId = driverId;
+ GroupName = groupName;
+
+ _operators = new Dictionary<string, object>();
+ _operatorInjectors = new Dictionary<string, IInjector>();
+
+ _networkService = networkService;
+ _mpiNetworkHandler = mpiNetworkObserver;
+ _commGroupNetworkHandler = new CommunicationGroupNetworkObserver();
+ _mpiNetworkHandler.Register(groupName, _commGroupNetworkHandler);
+
+ // Deserialize operator configuration and store each injector.
+ // When user requests the MPI Operator, use type information to
+ // create the instance.
+ foreach (string operatorConfigStr in operatorConfigs)
+ {
+ IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr);
+
+ IInjector injector = TangFactory.GetTang().NewInjector(operatorConfig);
+ string operatorName = injector.GetNamedInstance<MpiConfigurationOptions.OperatorName, string>(
+ GenericType<MpiConfigurationOptions.OperatorName>.Class);
+ _operatorInjectors[operatorName] = injector;
+ }
+ }
+
+ /// <summary>
+ /// Returns the Communication Group name
+ /// </summary>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Gets the BroadcastSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Broadcast operator</param>
+ /// <returns>The BroadcastSender</returns>
+ public IBroadcastSender<T> GetBroadcastSender<T>(string operatorName)
+ {
+ return GetOperatorInstance<BroadcastSender<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the BroadcastReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Broadcast operator</param>
+ /// <returns>The BroadcastReceiver</returns>
+ public IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName)
+ {
+ return GetOperatorInstance<BroadcastReceiver<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the ReduceSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Reduce operator</param>
+ /// <returns>The ReduceSender</returns>
+ public IReduceSender<T> GetReduceSender<T>(string operatorName)
+ {
+ return GetOperatorInstance<ReduceSender<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the ReduceReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Reduce operator</param>
+ /// <returns>The ReduceReceiver</returns>
+ public IReduceReceiver<T> GetReduceReceiver<T>(string operatorName)
+ {
+ return GetOperatorInstance<ReduceReceiver<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the ScatterSender with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Scatter operator</param>
+ /// <returns>The ScatterSender</returns>
+ public IScatterSender<T> GetScatterSender<T>(string operatorName)
+ {
+ return GetOperatorInstance<ScatterSender<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the ScatterReceiver with the given name and message type.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ /// <param name="operatorName">The name of the Scatter operator</param>
+ /// <returns>The ScatterReceiver</returns>
+ public IScatterReceiver<T> GetScatterReceiver<T>(string operatorName)
+ {
+ return GetOperatorInstance<ScatterReceiver<T>>(operatorName);
+ }
+
+ /// <summary>
+ /// Gets the MPI operator with the specified name and type.
+ /// If the operator hasn't been instanciated yet, find the injector
+ /// associated with the given operator name and use the type information
+ /// to create a new operator of that type.
+ /// </summary>
+ /// <typeparam name="T">The type of operator to create</typeparam>
+ /// <param name="operatorName">The name of the operator</param>
+ /// <returns>The newly created MPI Operator</returns>
+ private T GetOperatorInstance<T>(string operatorName) where T : class
+ {
+ if (string.IsNullOrEmpty(operatorName))
+ {
+ throw new ArgumentNullException("operatorName");
+ }
+ if (!_operatorInjectors.ContainsKey(operatorName))
+ {
+ throw new ArgumentException("Invalid operator name, cannot create CommunicationGroupClient");
+ }
+
+ object op;
+ if (!_operators.TryGetValue(operatorName, out op))
+ {
+ IInjector injector = _operatorInjectors[operatorName];
+
+ injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, _taskId);
+ injector.BindVolatileParameter(GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class, GroupName);
+ injector.BindVolatileInstance(GenericType<ICommunicationGroupNetworkObserver>.Class, _commGroupNetworkHandler);
+ injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, _networkService);
+ injector.BindVolatileInstance(GenericType<ICommunicationGroupClient>.Class, this);
+
+ try
+ {
+ op = injector.GetInstance<T>();
+ _operators[operatorName] = op;
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Error, "Cannot inject MPI operator: No known operator of type: {0}", typeof(T));
+ throw;
+ }
+ }
+
+ return (T) op;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
new file mode 100644
index 0000000..97ab082
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Handles incoming messages sent to this Communication Group.
+ /// </summary>
+ public class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
+ {
+ private Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers;
+
+ /// <summary>
+ /// Creates a new CommunicationGroupNetworkObserver.
+ /// </summary>
+ [Inject]
+ public CommunicationGroupNetworkObserver()
+ {
+ _handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>();
+ }
+
+ /// <summary>
+ /// Registers the handler with the CommunicationGroupNetworkObserver.
+ /// Messages that are to be sent to the operator specified by operatorName
+ /// are handled by the given observer.
+ /// </summary>
+ /// <param name="operatorName">The name of the operator whose handler
+ /// will be invoked</param>
+ /// <param name="observer">The handler to invoke when messages are sent
+ /// to the operator specified by operatorName</param>
+ public void Register(string operatorName, IObserver<GroupCommunicationMessage> observer)
+ {
+ if (string.IsNullOrEmpty(operatorName))
+ {
+ throw new ArgumentNullException("operatorName");
+ }
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ _handlers[operatorName] = observer;
+ }
+
+ /// <summary>
+ /// Handles the incoming GroupCommunicationMessage sent to this Communication Group.
+ /// Looks for the operator that the message is being sent to and invoke its handler.
+ /// </summary>
+ /// <param name="message">The incoming message</param>
+ public void OnNext(GroupCommunicationMessage message)
+ {
+ string operatorName = message.OperatorName;
+
+ IObserver<GroupCommunicationMessage> handler;
+ if (!_handlers.TryGetValue(operatorName, out handler))
+ {
+ throw new ArgumentException("No handler registered with the operator name: " + operatorName);
+ }
+
+ handler.OnNext(message);
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
new file mode 100644
index 0000000..7cec022
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Used by Tasks to fetch CommunicationGroupClients.
+ /// </summary>
+ public class MpiClient : IMpiClient
+ {
+ private Dictionary<string, ICommunicationGroupClient> _commGroups;
+
+ private INetworkService<GroupCommunicationMessage> _networkService;
+
+ /// <summary>
+ /// Creates a new MpiClient and registers the task ID with the Name Server.
+ /// </summary>
+ /// <param name="groupConfigs">The set of serialized Group Communication configurations</param>
+ /// <param name="taskId">The identifier for this task</param>
+ /// <param name="mpiNetworkObserver">The network handler to receive incoming messages
+ /// for this task</param>
+ /// <param name="networkService">The network service used to send messages</param>
+ /// <param name="configSerializer">Used to deserialize Group Communication configuration</param>
+ [Inject]
+ public MpiClient(
+ [Parameter(typeof(MpiConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs,
+ [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+ IMpiNetworkObserver mpiNetworkObserver,
+ NetworkService<GroupCommunicationMessage> networkService,
+ AvroConfigurationSerializer configSerializer)
+ {
+ _commGroups = new Dictionary<string, ICommunicationGroupClient>();
+ _networkService = networkService;
+ networkService.Register(new StringIdentifier(taskId));
+
+ foreach (string serializedGroupConfig in groupConfigs)
+ {
+ IConfiguration groupConfig = configSerializer.FromString(serializedGroupConfig);
+
+ IInjector injector = TangFactory.GetTang().NewInjector(groupConfig);
+ injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, taskId);
+ injector.BindVolatileInstance(GenericType<IMpiNetworkObserver>.Class, mpiNetworkObserver);
+ injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, networkService);
+
+ ICommunicationGroupClient commGroup = injector.GetInstance<ICommunicationGroupClient>();
+ _commGroups[commGroup.GroupName] = commGroup;
+ }
+ }
+
+ /// <summary>
+ /// Gets the CommunicationGroupClient for the given group name.
+ /// </summary>
+ /// <param name="groupName">The name of the CommunicationGroupClient</param>
+ /// <returns>The CommunicationGroupClient</returns>
+ public ICommunicationGroupClient GetCommunicationGroup(string groupName)
+ {
+ if (string.IsNullOrEmpty(groupName))
+ {
+ throw new ArgumentNullException("groupName");
+ }
+ if (!_commGroups.ContainsKey(groupName))
+ {
+ throw new ArgumentException("No CommunicationGroupClient with name: " + groupName);
+ }
+
+ return _commGroups[groupName];
+ }
+
+ /// <summary>
+ /// Disposes of the MpiClient's services.
+ /// </summary>
+ public void Dispose()
+ {
+ _networkService.Unregister();
+ _networkService.Dispose();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
new file mode 100644
index 0000000..baa2e5e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Handles all incoming messages for this Task.
+ /// </summary>
+ public class MpiNetworkObserver : IMpiNetworkObserver
+ {
+ private static Logger LOGGER = Logger.GetLogger(typeof(MpiNetworkObserver));
+
+ private Dictionary<string, IObserver<GroupCommunicationMessage>> _commGroupHandlers;
+
+ /// <summary>
+ /// Creates a new MpiNetworkObserver.
+ /// </summary>
+ [Inject]
+ public MpiNetworkObserver()
+ {
+ _commGroupHandlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>();
+ }
+
+ /// <summary>
+ /// Handles the incoming NsMessage for this Task.
+ /// Delegates the GroupCommunicationMessage to the correct
+ /// CommunicationGroupNetworkObserver.
+ /// </summary>
+ /// <param name="nsMessage"></param>
+ public void OnNext(NsMessage<GroupCommunicationMessage> nsMessage)
+ {
+ if (nsMessage == null)
+ {
+ throw new ArgumentNullException("nsMessage");
+ }
+
+ try
+ {
+ GroupCommunicationMessage gcm = nsMessage.Data.First();
+ _commGroupHandlers[gcm.GroupName].OnNext(gcm);
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Error, "Mpi Network Handler received message with no data");
+ throw;
+ }
+ catch (KeyNotFoundException)
+ {
+ LOGGER.Log(Level.Error, "Mpi Network Handler received message for nonexistant group");
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Registers the network handler for the given CommunicationGroup.
+ /// When messages are sent to the specified group name, the given handler
+ /// will be invoked with that message.
+ /// </summary>
+ /// <param name="groupName">The group name for the network handler</param>
+ /// <param name="commGroupHandler">The network handler to invoke when
+ /// messages are sent to the given group.</param>
+ public void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler)
+ {
+ if (string.IsNullOrEmpty(groupName))
+ {
+ throw new ArgumentNullException("groupName");
+ }
+ if (commGroupHandler == null)
+ {
+ throw new ArgumentNullException("commGroupHandler");
+ }
+
+ _commGroupHandlers[groupName] = commGroupHandler;
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
new file mode 100644
index 0000000..f4c7a60
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver;
+using System.Collections.Concurrent;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Stores all incoming messages sent by a particular Task.
+ /// </summary>
+ internal class NodeStruct
+ {
+ private BlockingCollection<GroupCommunicationMessage> _messageQueue;
+
+ /// <summary>
+ /// Creates a new NodeStruct.
+ /// </summary>
+ /// <param name="id">The Task identifier</param>
+ public NodeStruct(string id)
+ {
+ Identifier = id;
+ _messageQueue = new BlockingCollection<GroupCommunicationMessage>();
+ }
+
+ /// <summary>
+ /// Returns the identifier for the Task that sent all
+ /// messages in the message queue.
+ /// </summary>
+ public string Identifier { get; private set; }
+
+ /// <summary>
+ /// Gets the first message in the message queue.
+ /// </summary>
+ /// <returns>The first available message.</returns>
+ public byte[][] GetData()
+ {
+ return _messageQueue.Take().Data;
+ }
+
+ /// <summary>
+ /// Adds an incoming message to the message queue.
+ /// </summary>
+ /// <param name="gcm">The incoming message</param>
+ public void AddData(GroupCommunicationMessage gcm)
+ {
+ _messageQueue.Add(gcm);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
new file mode 100644
index 0000000..8752203
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// Contains the Operator's topology graph.
+ /// Used to send or receive messages to/from operators in the same
+ /// Communication Group.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class OperatorTopology<T> : IObserver<GroupCommunicationMessage>
+ {
+ private const int DefaultTimeout = 10000;
+ private const int RetryCount = 5;
+
+ private static Logger LOGGER = Logger.GetLogger(typeof(OperatorTopology<>));
+
+ private string _groupName;
+ private string _operatorName;
+ private string _selfId;
+ private string _driverId;
+
+ private NodeStruct _parent;
+ private List<NodeStruct> _children;
+ private Dictionary<string, NodeStruct> _idToNodeMap;
+ private ICodec<T> _codec;
+ private INameClient _nameClient;
+ private Sender _sender;
+ private BlockingCollection<NodeStruct> _nodesWithData;
+
+ /// <summary>
+ /// Creates a new OperatorTopology object.
+ /// </summary>
+ /// <param name="operatorName">The name of the MPI Operator</param>
+ /// <param name="groupName">The name of the operator's Communication Group</param>
+ /// <param name="taskId">The operator's Task identifier</param>
+ /// <param name="driverId">The identifer for the driver</param>
+ /// <param name="rootId">The identifier for the root Task in the topology graph</param>
+ /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
+ /// <param name="networkService">The network service</param>
+ /// <param name="codec">The codec used to serialize and deserialize messages</param>
+ /// <param name="sender">The Sender used to do point to point communication</param>
+ [Inject]
+ public OperatorTopology(
+ [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+ [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId,
+ [Parameter(typeof(MpiConfigurationOptions.TopologyRootTaskId))] string rootId,
+ [Parameter(typeof(MpiConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
+ NetworkService<GroupCommunicationMessage> networkService,
+ ICodec<T> codec,
+ Sender sender)
+ {
+ _operatorName = operatorName;
+ _groupName = groupName;
+ _selfId = taskId;
+ _driverId = driverId;
+ _codec = codec;
+ _nameClient = networkService.NamingClient;
+ _sender = sender;
+ _nodesWithData = new BlockingCollection<NodeStruct>();
+ _children = new List<NodeStruct>();
+ _idToNodeMap = new Dictionary<string, NodeStruct>();
+
+ if (_selfId.Equals(rootId))
+ {
+ _parent = null;
+ foreach (string childId in childIds)
+ {
+ NodeStruct node = new NodeStruct(childId);
+ _children.Add(node);
+ _idToNodeMap[childId] = node;
+ }
+ }
+ else
+ {
+ _parent = new NodeStruct(rootId);
+ _idToNodeMap[rootId] = _parent;
+ }
+ }
+
+ /// <summary>
+ /// Initializes operator topology.
+ /// Waits until all Tasks in the CommunicationGroup have registered themselves
+ /// with the Name Service.
+ /// </summary>
+ public void Initialize()
+ {
+ using (LOGGER.LogFunction("OperatorTopology::Initialize"))
+ {
+ if (_parent != null)
+ {
+ WaitForTaskRegistration(_parent.Identifier, RetryCount);
+ }
+
+ if (_children.Count > 0)
+ {
+ foreach (NodeStruct child in _children)
+ {
+ WaitForTaskRegistration(child.Identifier, RetryCount);
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handles the incoming GroupCommunicationMessage.
+ /// Updates the sending node's message queue.
+ /// </summary>
+ /// <param name="gcm">The incoming message</param>
+ public void OnNext(GroupCommunicationMessage gcm)
+ {
+ if (gcm == null)
+ {
+ throw new ArgumentNullException("gcm");
+ }
+ if (gcm.Source == null)
+ {
+ throw new ArgumentException("Message must have a source");
+ }
+
+ NodeStruct sourceNode = FindNode(gcm.Source);
+ if (sourceNode == null)
+ {
+ throw new IllegalStateException("Received message from invalid task id: " + gcm.Source);
+ }
+
+ _nodesWithData.Add(sourceNode);
+ sourceNode.AddData(gcm);
+ }
+
+ /// <summary>
+ /// Sends the message to the parent Task.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ /// <param name="type">The message type</param>
+ public void SendToParent(T message, MessageType type)
+ {
+ if (_parent == null)
+ {
+ throw new ArgumentException("No parent for node");
+ }
+
+ SendToNode(message, MessageType.Data, _parent);
+ }
+
+ /// <summary>
+ /// Sends the message to all child Tasks.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ /// <param name="type">The message type</param>
+ public void SendToChildren(T message, MessageType type)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+
+ foreach (NodeStruct child in _children)
+ {
+ SendToNode(message, MessageType.Data, child);
+ }
+ }
+
+ /// <summary>
+ /// Splits the list of messages up evenly and sends each sublist
+ /// to the child Tasks.
+ /// </summary>
+ /// <param name="messages">The list of messages to scatter</param>
+ /// <param name="type">The message type</param>
+ public void ScatterToChildren(List<T> messages, MessageType type)
+ {
+ if (messages == null)
+ {
+ throw new ArgumentNullException("messages");
+ }
+ if (_children.Count <= 0)
+ {
+ throw new ArgumentException("Cannot scatter, no children available");
+ }
+
+ int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count);
+ ScatterHelper(messages, _children, count);
+ }
+
+ /// <summary>
+ /// Splits the list of messages up into chunks of the specified size
+ /// and sends each sublist to the child Tasks.
+ /// </summary>
+ /// <param name="messages">The list of messages to scatter</param>
+ /// <param name="count">The size of each sublist</param>
+ /// <param name="type">The message type</param>
+ public void ScatterToChildren(List<T> messages, int count, MessageType type)
+ {
+ if (messages == null)
+ {
+ throw new ArgumentNullException("messages");
+ }
+ if (count <= 0)
+ {
+ throw new ArgumentException("Count must be positive");
+ }
+
+ ScatterHelper(messages, _children, count);
+ }
+
+ /// <summary>
+ /// Splits the list of messages up into chunks of the specified size
+ /// and sends each sublist to the child Tasks in the specified order.
+ /// </summary>
+ /// <param name="messages">The list of messages to scatter</param>
+ /// <param name="order">The order to send messages</param>
+ /// <param name="type">The message type</param>
+ public void ScatterToChildren(List<T> messages, List<string> order, MessageType type)
+ {
+ if (messages == null)
+ {
+ throw new ArgumentNullException("messages");
+ }
+ if (order == null || order.Count != _children.Count)
+ {
+ throw new ArgumentException("order cannot be null and must have the same number of elements as child tasks");
+ }
+
+ List<NodeStruct> nodes = new List<NodeStruct>();
+ foreach (string taskId in order)
+ {
+ NodeStruct node = FindNode(taskId);
+ if (node == null)
+ {
+ throw new IllegalStateException("Received message from invalid task id: " + taskId);
+ }
+
+ nodes.Add(node);
+ }
+
+ int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count);
+ ScatterHelper(messages, nodes, count);
+ }
+
+ /// <summary>
+ /// Receive an incoming message from the parent Task.
+ /// </summary>
+ /// <returns>The parent Task's message</returns>
+ public T ReceiveFromParent()
+ {
+ byte[][] data = ReceiveFromNode(_parent, true);
+ if (data == null || data.Length != 1)
+ {
+ throw new InvalidOperationException("Cannot receive data from parent node");
+ }
+
+ return _codec.Decode(data[0]);
+ }
+
+ /// <summary>
+ /// Receive a list of incoming messages from the parent Task.
+ /// </summary>
+ /// <returns>The parent Task's list of messages</returns>
+ public List<T> ReceiveListFromParent()
+ {
+ byte[][] data = ReceiveFromNode(_parent, true);
+ if (data == null || data.Length == 0)
+ {
+ throw new InvalidOperationException("Cannot receive data from parent node");
+ }
+
+ return data.Select(b => _codec.Decode(b)).ToList();
+ }
+
+ /// <summary>
+ /// Receives all messages from child Tasks and reduces them with the
+ /// given IReduceFunction.
+ /// </summary>
+ /// <param name="reduceFunction">The class used to reduce messages</param>
+ /// <returns>The result of reducing messages</returns>
+ public T ReceiveFromChildren(IReduceFunction<T> reduceFunction)
+ {
+ if (reduceFunction == null)
+ {
+ throw new ArgumentNullException("reduceFunction");
+ }
+
+ var receivedData = new List<T>();
+ var childrenToReceiveFrom = new HashSet<string>(_children.Select(node => node.Identifier));
+
+ while (childrenToReceiveFrom.Count > 0)
+ {
+ NodeStruct childWithData = GetNodeWithData();
+ byte[][] data = ReceiveFromNode(childWithData, false);
+ if (data == null || data.Length != 1)
+ {
+ throw new InvalidOperationException("Received invalid data from child with id: " + childWithData.Identifier);
+ }
+
+ receivedData.Add(_codec.Decode(data[0]));
+ childrenToReceiveFrom.Remove(childWithData.Identifier);
+ }
+
+ return reduceFunction.Reduce(receivedData);
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ /// <summary>
+ /// Get a node containing an incoming message.
+ /// </summary>
+ /// <returns>A NodeStruct with incoming data.</returns>
+ private NodeStruct GetNodeWithData()
+ {
+ CancellationTokenSource timeoutSource = new CancellationTokenSource(DefaultTimeout);
+
+ try
+ {
+ return _nodesWithData.Take(timeoutSource.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ LOGGER.Log(Level.Error, "No data to read from child");
+ throw;
+ }
+ catch (ObjectDisposedException)
+ {
+ LOGGER.Log(Level.Error, "No data to read from child");
+ throw;
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Error, "No data to read from child");
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Sends the message to the Task represented by the given NodeStruct.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ /// <param name="msgType">The message type</param>
+ /// <param name="node">The NodeStruct representing the Task to send to</param>
+ private void SendToNode(T message, MessageType msgType, NodeStruct node)
+ {
+ GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName,
+ _selfId, node.Identifier, _codec.Encode(message), msgType);
+
+ _sender.Send(gcm);
+ }
+
+ /// <summary>
+ /// Sends the list of messages to the Task represented by the given NodeStruct.
+ /// </summary>
+ /// <param name="messages">The list of messages to send</param>
+ /// <param name="msgType">The message type</param>
+ /// <param name="node">The NodeStruct representing the Task to send to</param>
+ private void SendToNode(List<T> messages, MessageType msgType, NodeStruct node)
+ {
+ byte[][] encodedMessages = messages.Select(message => _codec.Encode(message)).ToArray();
+ GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName,
+ _selfId, node.Identifier, encodedMessages, msgType);
+
+ _sender.Send(gcm);
+ }
+
+ private void ScatterHelper(List<T> messages, List<NodeStruct> order, int count)
+ {
+ if (count <= 0)
+ {
+ throw new ArgumentException("Count must be positive");
+ }
+
+ int i = 0;
+ foreach (NodeStruct nodeStruct in order)
+ {
+ // The last sublist might be smaller than count if the number of
+ // child tasks is not evenly divisible by count
+ int left = messages.Count - i;
+ int size = (left < count) ? left : count;
+ if (size <= 0)
+ {
+ throw new ArgumentException("Scatter count must be positive");
+ }
+
+ List<T> sublist = messages.GetRange(i, size);
+ SendToNode(sublist, MessageType.Data, nodeStruct);
+
+ i += size;
+ }
+ }
+
+ /// <summary>
+ /// Receive a message from the Task represented by the given NodeStruct.
+ /// Removes the NodeStruct from the nodesWithData queue if requested.
+ /// </summary>
+ /// <param name="node">The node to receive from</param>
+ /// <param name="removeFromQueue">Whether or not to remove the NodeStruct
+ /// from the nodesWithData queue</param>
+ /// <returns>The byte array message from the node</returns>
+ private byte[][] ReceiveFromNode(NodeStruct node, bool removeFromQueue)
+ {
+ byte[][] data = node.GetData();
+ if (removeFromQueue)
+ {
+ _nodesWithData.Take(node);
+ }
+
+ return data;
+ }
+
+ /// <summary>
+ /// Find the NodeStruct with the given Task identifier.
+ /// </summary>
+ /// <param name="identifier">The identifier of the Task</param>
+ /// <returns>The NodeStruct</returns>
+ private NodeStruct FindNode(string identifier)
+ {
+ NodeStruct node;
+ return _idToNodeMap.TryGetValue(identifier, out node) ? node : null;
+ }
+
+ /// <summary>
+ /// Checks if the identifier is registered with the Name Server.
+ /// Throws exception if the operation fails more than the retry count.
+ /// </summary>
+ /// <param name="identifier">The identifier to look up</param>
+ /// <param name="retries">The number of times to retry the lookup operation</param>
+ private void WaitForTaskRegistration(string identifier, int retries)
+ {
+ for (int i = 0; i < retries; i++)
+ {
+ if (_nameClient.Lookup(identifier) != null)
+ {
+ return;
+ }
+
+ Thread.Sleep(500);
+ LOGGER.Log(Level.Verbose, "Retry {0}: retrying lookup for node: {1}", i + 1, identifier);
+ }
+
+ throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
new file mode 100644
index 0000000..5342410
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+ /// <summary>
+ /// Represents a graph of MPI Operators where there are only two levels of
+ /// nodes: the root and all children extending from the root.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class FlatTopology<T> : ITopology<T>
+ {
+ private string _groupName;
+ private string _operatorName;
+
+ private string _rootId;
+ private string _driverId;
+
+ private Dictionary<string, TaskNode> _nodes;
+ private TaskNode _root;
+
+ /// <summary>
+ /// Creates a new FlatTopology.
+ /// </summary>
+ /// <param name="operatorName">The operator name</param>
+ /// <param name="groupName">The name of the topology's CommunicationGroup</param>
+ /// <param name="rootId">The root Task identifier</param>
+ /// <param name="driverId">The driver identifier</param>
+ /// <param name="operatorSpec">The operator specification</param>
+ public FlatTopology(
+ string operatorName,
+ string groupName,
+ string rootId,
+ string driverId,
+ IOperatorSpec<T> operatorSpec)
+ {
+ _groupName = groupName;
+ _operatorName = operatorName;
+ _rootId = rootId;
+ _driverId = driverId;
+
+ OperatorSpec = operatorSpec;
+
+ _nodes = new Dictionary<string, TaskNode>();
+ }
+
+ /// <summary>
+ /// Gets the Operator specification
+ /// </summary>
+ public IOperatorSpec<T> OperatorSpec { get; set; }
+
+ /// <summary>
+ /// Gets the task configuration for the operator topology.
+ /// </summary>
+ /// <param name="taskId">The task identifier</param>
+ /// <returns>The task configuration</returns>
+ public IConfiguration GetTaskConfiguration(string taskId)
+ {
+ var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType())
+ .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
+ GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
+ _rootId);
+
+ foreach (string tId in _nodes.Keys)
+ {
+ if (!tId.Equals(_rootId))
+ {
+ confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>(
+ GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class,
+ tId);
+ }
+ }
+
+ if (OperatorSpec is BroadcastOperatorSpec<T>)
+ {
+ BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>;
+ if (taskId.Equals(broadcastSpec.SenderId))
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+ }
+ else
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+ }
+ }
+ else if (OperatorSpec is ReduceOperatorSpec<T>)
+ {
+ ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>;
+ confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType());
+
+ if (taskId.Equals(reduceSpec.ReceiverId))
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+ }
+ else
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+ }
+ }
+ else if (OperatorSpec is ScatterOperatorSpec<T>)
+ {
+ ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>;
+ if (taskId.Equals(scatterSpec.SenderId))
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+ }
+ else
+ {
+ confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+ }
+ }
+ else
+ {
+ throw new NotSupportedException("Spec type not supported");
+ }
+
+ return confBuilder.Build();
+ }
+
+ /// <summary>
+ /// Adds a task to the topology graph.
+ /// </summary>
+ /// <param name="taskId">The identifier of the task to add</param>
+ public void AddTask(string taskId)
+ {
+ if (string.IsNullOrEmpty(taskId))
+ {
+ throw new ArgumentNullException("taskId");
+ }
+ else if (_nodes.ContainsKey(taskId))
+ {
+ throw new ArgumentException("Task has already been added to the topology");
+ }
+
+ if (taskId.Equals(_rootId))
+ {
+ SetRootNode(_rootId);
+ }
+ else
+ {
+ AddChild(taskId);
+ }
+ }
+
+ private void SetRootNode(string rootId)
+ {
+ TaskNode rootNode = new TaskNode(_groupName, _operatorName, rootId, _driverId);
+ _root = rootNode;
+
+ foreach (TaskNode childNode in _nodes.Values)
+ {
+ rootNode.AddChild(childNode);
+ childNode.SetParent(rootNode);
+ }
+ }
+
+ private void AddChild(string childId)
+ {
+ TaskNode childNode = new TaskNode(_groupName, _operatorName, childId, _driverId);
+ _nodes[childId] = childNode;
+
+ if (_root != null)
+ {
+ _root.AddChild(childNode);
+ childNode.SetParent(_root);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
new file mode 100644
index 0000000..c4dc9e6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+ /// <summary>
+ /// Represents a topology graph for IMpiOperators.
+ /// </summary>
+ public interface ITopology<T>
+ {
+ IOperatorSpec<T> OperatorSpec { get; }
+
+ IConfiguration GetTaskConfiguration(string taskId);
+
+ void AddTask(string taskId);
+ }
+}