You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/06/08 08:35:32 UTC

incubator-reef git commit: [REEF-353]: Derive OperatorTopology from an interface IOperatorTopology

Repository: incubator-reef
Updated Branches:
  refs/heads/master 653140fca -> 75c235823


[REEF-353]: Derive OperatorTopology from an interface IOperatorTopology

This addressed the issue by
  * Creating IOperatorTopology snd making OperatorTopology derive from
    it.
  * In operators like BroadcastSender, BroadcastReceiver etc, use
    IOperatorTopology instead of OperatorTopology.

JIRA:
  [REEF-353](https://issues.apache.org/jira/browse/REEF-353)

Pull Request:
  This closes #207


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/75c23582
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/75c23582
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/75c23582

Branch: refs/heads/master
Commit: 75c2358233f8562c7d958214399a7598dba98585
Parents: 653140f
Author: Dhruv <dh...@gmail.com>
Authored: Sat Jun 6 00:28:13 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Jun 8 16:33:22 2015 +1000

----------------------------------------------------------------------
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   4 +-
 .../Group/Operators/Impl/BroadcastSender.cs     |   4 +-
 .../Group/Operators/Impl/ReduceReceiver.cs      |   4 +-
 .../Group/Operators/Impl/ReduceSender.cs        |   4 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |   9 +-
 .../Group/Operators/Impl/ScatterSender.cs       |   4 +-
 .../Group/Task/IOperatorTopology.cs             | 100 +++++++++++++++++++
 .../Group/Task/Impl/OperatorTopology.cs         |  46 ++-------
 .../Org.Apache.REEF.Network.csproj              |   1 +
 9 files changed, 125 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index 65ed6b9..b53929e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -36,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     public class BroadcastReceiver<T> : IBroadcastReceiver<T>
     {
         private const int PipelineVersion = 2;
-        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastReceiver<T>));
 
         /// <summary>
@@ -64,7 +64,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             PipelineDataConverter = dataConverter;
             _topology = topology;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 34f9cd2..4af01c7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastSender<T>));
         private const int PipelineVersion = 2;
-        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly IOperatorTopology<PipelineMessage<T>> _topology;
 
         /// <summary>
         /// Creates a new BroadcastSender to send messages to other Tasks.
@@ -66,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = PipelineVersion;
             PipelineDataConverter = dataConverter;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index 0c2fd94..bea9c83 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceReceiver<T>));
         private const int PipelineVersion = 2;
-        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
 
         /// <summary>
@@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index cd2049b..5f289da 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -38,7 +38,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceSender<T>));
         private const int PipelineVersion = 2;
-        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly IOperatorTopology<PipelineMessage<T>> _topology;
         private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
 
         /// <summary>
@@ -72,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             PipelineDataConverter = dataConverter;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index 13635bb..a9f4b10 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -18,6 +18,7 @@
  */
 
 using System.Collections.Generic;
+using System.Linq;
 using System.Reactive;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -35,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     public class ScatterReceiver<T> : IScatterReceiver<T>
     {
         private const int DefaultVersion = 1;
-        private readonly OperatorTopology<T> _topology;
+        private readonly IOperatorTopology<T> _topology;
 
         /// <summary>
         /// Creates a new ScatterReceiver.
@@ -59,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = DefaultVersion;
             _topology = topology;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)
@@ -94,9 +95,9 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <returns>The sublist of messages</returns>
         public List<T> Receive()
         {
-            List<T> elements = _topology.ReceiveListFromParent();
+            IList<T> elements = _topology.ReceiveListFromParent();
             _topology.ScatterToChildren(elements, MessageType.Data);
-            return elements;
+            return elements.ToList();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 47b6f6f..80bc84e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -36,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     public class ScatterSender<T> : IScatterSender<T>
     {
         private const int DefaultVersion = 1;
-        private readonly OperatorTopology<T> _topology;
+        private readonly IOperatorTopology<T> _topology;
 
         /// <summary>
         /// Creates a new ScatterSender.
@@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = DefaultVersion;
             _topology = topology;
 
-            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
+            var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message));
             networkHandler.Register(operatorName, msgHandler);
 
             if (initialize)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
new file mode 100644
index 0000000..2e65080
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+
+namespace Org.Apache.REEF.Network.Group.Task
+{
+    /// <summary>
+    /// Contains the Operator's topology graph.
+    /// Used to send or receive messages to/from operators in the same
+    /// Communication Group.
+    /// </summary>
+    /// <typeparam name="T">The type of message</typeparam>
+    public interface IOperatorTopology<T>
+    {
+        /// <summary>
+        /// Sends the message to the parent Task.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="type">The message type</param>
+        void SendToParent(T message, MessageType type);
+
+        /// <summary>
+        /// Sends the message to all child Tasks.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="type">The message type</param>
+        void SendToChildren(T message, MessageType type);
+
+        /// <summary>
+        /// Splits the list of messages up evenly and sends each sublist
+        /// to the child Tasks.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="type">The message type</param>
+        void ScatterToChildren(IList<T> messages, MessageType type);
+
+        /// <summary>
+        /// Splits the list of messages up into chunks of the specified size 
+        /// and sends each sublist to the child Tasks.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="count">The size of each sublist</param>
+        /// <param name="type">The message type</param>
+        void ScatterToChildren(IList<T> messages, int count, MessageType type);
+
+        /// <summary>
+        /// Splits the list of messages up into chunks of the specified size 
+        /// and sends each sublist to the child Tasks in the specified order.
+        /// </summary>
+        /// <param name="messages">The list of messages to scatter</param>
+        /// <param name="order">The order to send messages</param>
+        /// <param name="type">The message type</param>
+        void ScatterToChildren(IList<T> messages, List<string> order, MessageType type);
+
+        /// <summary>
+        /// Receive an incoming message from the parent Task.
+        /// </summary>
+        /// <returns>The parent Task's message</returns>
+        T ReceiveFromParent();
+
+        /// <summary>
+        /// Receive a list of incoming messages from the parent Task.
+        /// </summary>
+        /// <returns>The parent Task's list of messages</returns>
+        IList<T> ReceiveListFromParent();
+
+        /// <summary>
+        /// Receives all messages from child Tasks and reduces them with the
+        /// given IReduceFunction.
+        /// </summary>
+        /// <param name="reduceFunction">The class used to reduce messages</param>
+        /// <returns>The result of reducing messages</returns>
+        T ReceiveFromChildren(IReduceFunction<T> reduceFunction);
+
+        /// <summary>
+        /// Checks if the node has children
+        /// </summary>
+        /// <returns>true if children are there, false otherwise</returns>
+        bool HasChildren();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 315cf64..dd11402 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Collections;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
@@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Communication Group.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public class OperatorTopology<T> : IObserver<GroupCommunicationMessage>
+    public class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GroupCommunicationMessage>
     {
         private const int DefaultTimeout = 50000;
         private const int RetryCount = 10;
@@ -213,7 +214,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         /// <param name="messages">The list of messages to scatter</param>
         /// <param name="type">The message type</param>
-        public void ScatterToChildren(List<T> messages, MessageType type)
+        public void ScatterToChildren(IList<T> messages, MessageType type)
         {
             if (messages == null)
             {
@@ -235,7 +236,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to scatter</param>
         /// <param name="count">The size of each sublist</param>
         /// <param name="type">The message type</param>
-        public void ScatterToChildren(List<T> messages, int count, MessageType type)
+        public void ScatterToChildren(IList<T> messages, int count, MessageType type)
         {
             if (messages == null)
             {
@@ -256,7 +257,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to scatter</param>
         /// <param name="order">The order to send messages</param>
         /// <param name="type">The message type</param>
-        public void ScatterToChildren(List<T> messages, List<string> order, MessageType type)
+        public void ScatterToChildren(IList<T> messages, List<string> order, MessageType type)
         {
             if (messages == null)
             {
@@ -302,7 +303,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Receive a list of incoming messages from the parent Task.
         /// </summary>
         /// <returns>The parent Task's list of messages</returns>
-        public List<T> ReceiveListFromParent()
+        public IList<T> ReceiveListFromParent()
         {
             byte[][] data = ReceiveFromNode(_parent);
             if (data == null || data.Length == 0)
@@ -428,35 +429,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         }
 
         /// <summary>
-        /// Get a node containing an incoming message.
-        /// </summary>
-        /// <returns>A NodeStruct with incoming data.</returns>
-        private NodeStruct GetNodeWithData()
-        {
-            CancellationTokenSource timeoutSource = new CancellationTokenSource(_timeout);
-
-            try
-            {
-                return _nodesWithData.Take(timeoutSource.Token);
-            }
-            catch (OperationCanceledException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
-            catch (InvalidOperationException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
-        }
-
-        /// <summary>
         /// Sends the message to the Task represented by the given NodeStruct.
         /// </summary>
         /// <param name="message">The message to send</param>
@@ -476,7 +448,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send to</param>
-        private void SendToNode(List<T> messages, MessageType msgType, NodeStruct node)
+        private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct node)
         {
             byte[][] encodedMessages = messages.Select(message => _codec.Encode(message)).ToArray();
             GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName,
@@ -485,7 +457,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _sender.Send(gcm);
         }
 
-        private void ScatterHelper(List<T> messages, List<NodeStruct> order, int count)
+        private void ScatterHelper(IList<T> messages, List<NodeStruct> order, int count)
         {
             if (count <= 0)
             {
@@ -504,7 +476,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                     throw new ArgumentException("Scatter count must be positive");
                 }
 
-                List<T> sublist = messages.GetRange(i, size);
+                IList<T> sublist = messages.ToList().GetRange(i, size);
                 SendToNode(sublist, MessageType.Data, nodeStruct);
 
                 i += size;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75c23582/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 96f6ee2..b8989ee 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -79,6 +79,7 @@ under the License.
     <Compile Include="Group\Operators\Impl\ScatterSender.cs" />
     <Compile Include="Group\Operators\Impl\Sender.cs" />
     <Compile Include="Group\Operators\IOperatorSpec.cs" />
+    <Compile Include="Group\Task\IOperatorTopology.cs" />
     <Compile Include="Group\Operators\IReduceFunction.cs" />
     <Compile Include="Group\Operators\IReduceReceiver.cs" />
     <Compile Include="Group\Operators\IReduceSender.cs" />