You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/07/02 00:45:16 UTC

reef git commit: [REEF-1453] GroupCommunication should create a new observer for each Task

Repository: reef
Updated Branches:
  refs/heads/master 4ab6a8d1b -> df1a226d6


[REEF-1453] GroupCommunication should create a new observer for each Task

This addressed the issue by
  * Mapping Task observers direclty to the IPEndpoints of their clients.
  * Adding a universal observer that distinguishes between the connecting IPEndpoint to ObserverContainer.
  * Removing the type map in ObserverContainer.
  * Restructuring the observer hierarchy such that the universal observer bootstraps the Task observers, which the node observers register with.

JIRA:
  [REEF-1453](https://issues.apache.org/jira/browse/REEF-1453)

This closes #1059


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/df1a226d
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/df1a226d
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/df1a226d

Branch: refs/heads/master
Commit: df1a226d63c25b4e1de25964ea34a074e0ddb32f
Parents: 4ab6a8d
Author: Andrew Chung <af...@gmail.com>
Authored: Tue Jun 28 14:36:23 2016 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Fri Jul 1 16:33:55 2016 -0700

----------------------------------------------------------------------
 .../Impl/GeneralGroupCommunicationMessage.cs    |  31 +++--
 .../Group/Driver/Impl/GroupCommDriver.cs        |   3 +-
 .../Driver/Impl/GroupCommunicationMessage.cs    |  16 +--
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   6 -
 .../Group/Operators/Impl/BroadcastSender.cs     |   6 -
 .../Group/Operators/Impl/ReduceReceiver.cs      |   7 -
 .../Group/Operators/Impl/ReduceSender.cs        |   4 -
 .../Group/Operators/Impl/ScatterReceiver.cs     |   8 +-
 .../Group/Operators/Impl/ScatterSender.cs       |   7 +-
 .../Task/ICommunicationGroupNetworkObserver.cs  |  43 ------
 .../Group/Task/IGroupCommNetworkObserver.cs     |  43 ------
 .../Group/Task/Impl/CommunicationGroupClient.cs |   7 -
 .../Impl/CommunicationGroupNetworkObserver.cs   | 111 ---------------
 .../Group/Task/Impl/GroupCommNetworkObserver.cs |  92 ++++++-------
 .../Group/Task/Impl/NodeMessageObserver.cs      |  80 +++++++++++
 .../Group/Task/Impl/NodeObserverIdentifier.cs   | 120 +++++++++++++++++
 .../Group/Task/Impl/NodeStruct.cs               |  16 ++-
 .../Group/Task/Impl/OperatorTopology.cs         |  61 ++-------
 .../Group/Task/Impl/TaskMessageObserver.cs      | 134 +++++++++++++++++++
 .../NetworkService/INetworkService.cs           |   6 +
 .../NetworkService/NetworkService.cs            |   8 ++
 .../NetworkService/StreamingNetworkService.cs   |  94 ++++++++++---
 .../Org.Apache.REEF.Network.csproj              |   6 +-
 .../Remote/Impl/ObserverContainer.cs            |  33 ++---
 24 files changed, 544 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
index 7cce556..1009fc0 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -15,21 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
+
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
     /// Messages sent by MPI Operators. This is the class inherited by 
     /// GroupCommunicationMessage but seen by Network Service
     /// </summary>
-    public class GeneralGroupCommunicationMessage
-    {        
-        /// <summary>
-        /// Empty constructor to allow instantiation by reflection
-        /// </summary>
-        protected GeneralGroupCommunicationMessage()
-        {
-        }
-
+    public abstract class GeneralGroupCommunicationMessage
+    {
         /// <summary>
         /// Create new CommunicationGroupMessage.
         /// </summary>
@@ -37,36 +32,44 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="operatorName">The name of the MPI operator</param>
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
+        /// <param name="messageType">The type of the GC message</param>
         protected GeneralGroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
-            string destination)
+            string destination,
+            Type messageType)
         {
             GroupName = groupName;
             OperatorName = operatorName;
             Source = source;
             Destination = destination;
+            MessageType = messageType;
         }
 
         /// <summary>
         /// Returns the Communication Group name.
         /// </summary>
-        internal string GroupName { get; set; }
+        internal string GroupName { get; private set; }
 
         /// <summary>
         /// Returns the MPI Operator name.
         /// </summary>
-        internal string OperatorName { get; set; }
+        internal string OperatorName { get; private set; }
 
         /// <summary>
         /// Returns the source of the message.
         /// </summary>
-        internal string Source { get; set; }
+        internal string Source { get; private set; }
 
         /// <summary>
         /// Returns the destination of the message.
         /// </summary>
-        internal string Destination { get; set; }
+        internal string Destination { get; private set; }
+
+        /// <summary>
+        /// The Type of the GroupCommunicationMessage.
+        /// </summary>
+        internal Type MessageType { get; private set; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index f49c38b..e636b04 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -34,6 +34,7 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Wake.Remote;
 using ContextConfiguration = Org.Apache.REEF.Common.Context.ContextConfiguration;
 
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
@@ -196,7 +197,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
 
             return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
                 .BindImplementation(
-                    GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class,
+                    GenericType<IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>>.Class,
                     GenericType<GroupCommNetworkObserver>.Class)
                 .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
                     GenericType<NamingConfigurationOptions.NameServerAddress>.Class,

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index aba944c..2f01cef 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
-
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
@@ -25,14 +23,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
     {
         /// <summary>
-        /// Empty constructor to allow instantiation by reflection
-        /// </summary>
-        [Inject]
-        private GroupCommunicationMessage()
-        {
-        }
-
-        /// <summary>
         /// Create new CommunicationGroupMessage.
         /// </summary>
         /// <param name="groupName">The name of the communication group</param>
@@ -46,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             string source,
             string destination,
             T message)
-            : base(groupName, operatorName, source, destination)
+            : base(groupName, operatorName, source, destination, typeof(T))
         {
             Data = new[] { message };
         }
@@ -65,7 +55,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             string source,
             string destination,
             T[] message)
-            : base(groupName, operatorName, source, destination)
+            : base(groupName, operatorName, source, destination, typeof(T))
         {
             Data = message;
         }
@@ -76,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         internal T[] Data
         {
             get;
-            set;
+            private set;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index 6152e05..bc72fea 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System.Reactive;
 using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -48,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The node's topology graph</param>
-        /// <param name="networkHandler">The incoming message handler</param>
         /// <param name="dataConverter">The converter used to convert original message to pipelined ones and vice versa.</param>
         [Inject]
         private BroadcastReceiver(
@@ -56,7 +54,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
             OperatorTopology<PipelineMessage<T>> topology,
-            ICommunicationGroupNetworkObserver networkHandler,
             IPipelineDataConverter<T> dataConverter)
         {
             OperatorName = operatorName;
@@ -65,9 +62,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             PipelineDataConverter = dataConverter;
             _topology = topology;
             _initialize = initialize;
-
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 279dd33..aa75c6e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -16,7 +16,6 @@
 // under the License.
 
 using System;
-using System.Reactive;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -49,7 +48,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The node's topology graph</param>
-        /// <param name="networkHandler">The incoming message handler</param>
         /// <param name="dataConverter">The converter used to convert original
         /// message to pipelined ones and vice versa.</param>
         [Inject]
@@ -58,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
             OperatorTopology<PipelineMessage<T>> topology,
-            ICommunicationGroupNetworkObserver networkHandler,
             IPipelineDataConverter<T> dataConverter)
         {
             _topology = topology;
@@ -67,9 +64,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             Version = PipelineVersion;
             PipelineDataConverter = dataConverter;
             _initialize = initialize;
-
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index d3a0102..e94c1ea 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System.Reactive;
 using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
@@ -49,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The task's operator topology graph</param>
-        /// <param name="networkHandler">Handles incoming messages from other tasks</param>
         /// <param name="reduceFunction">The class used to aggregate all incoming messages</param>
         /// <param name="dataConverter">The converter used to convert original
         /// message to pipelined ones and vice versa.</param>
@@ -59,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
             OperatorTopology<PipelineMessage<T>> topology,
-            ICommunicationGroupNetworkObserver networkHandler,
             IReduceFunction<T> reduceFunction,
             IPipelineDataConverter<T> dataConverter)
         {
@@ -72,9 +68,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
             _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
-
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index d2d1e5c..813db3e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -60,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
             OperatorTopology<PipelineMessage<T>> topology,
-            ICommunicationGroupNetworkObserver networkHandler,
             IReduceFunction<T> reduceFunction,
             IPipelineDataConverter<T> dataConverter)
         {
@@ -74,9 +73,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             _topology = topology;
             _initialize = initialize;
 
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
-
             PipelineDataConverter = dataConverter;
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index ac8de01..d6fdfa1 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -17,7 +17,6 @@
 
 using System.Collections.Generic;
 using System.Linq;
-using System.Reactive;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -47,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The task's operator topology graph</param>
-        /// <param name="networkHandler">Handles incoming messages from other tasks</param>
         [Inject]
         private ScatterReceiver(
             [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
-            OperatorTopology<T> topology, 
-            ICommunicationGroupNetworkObserver networkHandler)
+            OperatorTopology<T> topology)
         {
             OperatorName = operatorName;
             GroupName = groupName;
             Version = DefaultVersion;
             _topology = topology;
             _initialize = initialize;
-
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 0f2cf83..440738b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -46,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The operator topology</param>
-        /// <param name="networkHandler">The network handler</param>
         [Inject]
         private ScatterSender(
             [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
-            OperatorTopology<T> topology,
-            ICommunicationGroupNetworkObserver networkHandler)
+            OperatorTopology<T> topology)
         {
             OperatorName = operatorName;
             GroupName = groupName;
             Version = DefaultVersion;
             _topology = topology;
             _initialize = initialize;
-
-            var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
-            networkHandler.Register(operatorName, msgHandler);
         }
 
         public string OperatorName { get; private set; }

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
deleted file mode 100644
index afa407e..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Task.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Group.Task
-{
-    /// <summary>
-    /// Handles incoming messages sent to this Communication Group.
-    /// Writable Version
-    /// </summary>
-    [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
-    internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
-    {
-        /// <summary>
-        /// Registers the handler with the WritableCommunicationGroupNetworkObserver.
-        /// Messages that are to be sent to the operator specified by operatorName
-        /// are handled by the given observer.
-        /// </summary>
-        /// <param name="operatorName">The name of the operator whose handler
-        /// will be invoked</param>
-        /// <param name="observer">The handler to invoke when messages are sent
-        /// to the operator specified by operatorName</param>
-        void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
deleted file mode 100644
index 55d6942..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Task.Impl;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Group.Task
-{
-    /// <summary>
-    /// Handles all incoming messages for this Task.
-    /// Writable Version
-    /// </summary>
-    [DefaultImplementation(typeof(GroupCommNetworkObserver))]
-    internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
-    {
-        /// <summary>
-        /// Registers the network handler for the given CommunicationGroup.
-        /// When messages are sent to the specified group name, the given handler
-        /// will be invoked with that message.
-        /// </summary>
-        /// <param name="groupName">The group name for the network handler</param>
-        /// <param name="commGroupHandler">The network handler to invoke when
-        /// messages are sent to the given group.</param>
-        void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler);
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index aaeb98c..2fc90d9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -43,25 +43,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         /// <param name="groupName">The name of the CommunicationGroup</param>
         /// <param name="operatorConfigs">The serialized operator configurations</param>
-        /// <param name="groupCommNetworkObserver">The handler for all incoming messages
-        /// across all Communication Groups</param>
         /// <param name="configSerializer">Used to deserialize operator configuration.</param>
-        /// <param name="commGroupNetworkHandler">Communication group network observer that holds all the handlers for each operator.</param>
         /// <param name="injector">injector forked from the injector that creates this instance</param>
         [Inject]
         private CommunicationGroupClient(
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(GroupCommConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs,
-            IGroupCommNetworkObserver groupCommNetworkObserver,
             AvroConfigurationSerializer configSerializer,
-            ICommunicationGroupNetworkObserver commGroupNetworkHandler,
             IInjector injector)
         {
             _operators = new Dictionary<string, object>();
 
             GroupName = groupName;
-            groupCommNetworkObserver.Register(groupName, commGroupNetworkHandler);
-
             foreach (string operatorConfigStr in operatorConfigs)
             {                
                 IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr);

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
deleted file mode 100644
index 03d0dd4..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ /dev/null
@@ -1,111 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Network.Group.Task.Impl
-{
-    /// <summary>
-    /// Handles incoming messages sent to this Communication Group.
-    /// Writable version
-    /// </summary>
-    internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
-        private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers;
-
-        /// <summary>
-        /// Creates a new CommunicationGroupNetworkObserver.
-        /// </summary>
-        [Inject]
-        private CommunicationGroupNetworkObserver()
-        {
-            _handlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
-        }
-
-        /// <summary>
-        /// Registers the handler with the CommunicationGroupNetworkObserver.
-        /// Messages that are to be sent to the operator specified by operatorName
-        /// are handled by the given observer.
-        /// </summary>
-        /// <param name="operatorName">The name of the operator whose handler
-        /// will be invoked</param>
-        /// <param name="observer">The writable handler to invoke when messages are sent
-        /// to the operator specified by operatorName</param>
-        void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
-        {
-            if (string.IsNullOrEmpty(operatorName))
-            {
-                throw new ArgumentNullException("operatorName");
-            }
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            _handlers[operatorName] = observer;
-        }
-
-        /// <summary>
-        /// Handles the incoming GeneralGroupCommunicationMessage sent to this Communication Group.
-        /// Looks for the operator that the message is being sent to and invoke its handler.
-        /// </summary>
-        /// <param name="message">The incoming message</param>
-        public void OnNext(GeneralGroupCommunicationMessage message)
-        {
-            string operatorName = message.OperatorName;
-
-            IObserver<GeneralGroupCommunicationMessage> handler = GetOperatorHandler(operatorName);
-            if (handler == null)
-            {
-                Exceptions.Throw(new ArgumentException("No handler registered with the operator name: " + operatorName), LOGGER);
-            }
-            else
-            {
-                handler.OnNext(message);
-            }
-        }
-
-        /// <summary>
-        /// GetOperatorHandler for operatorName
-        /// </summary>
-        /// <param name="operatorName"></param>
-        /// <returns></returns>
-        private IObserver<GeneralGroupCommunicationMessage> GetOperatorHandler(string operatorName)
-        {
-            IObserver<GeneralGroupCommunicationMessage> handler;
-            if (!_handlers.TryGetValue(operatorName, out handler))
-            {
-                Exceptions.Throw(new ApplicationException("No handler registered yet with the operator name: " + operatorName), LOGGER);
-            }
-            return handler;
-        }
-
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnCompleted()
-        {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 47a4554..d82ddd2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -16,12 +16,18 @@
 // under the License.
 
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
+using System.Net;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -29,71 +35,67 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Handles all incoming messages for this Task.
     /// Writable version
     /// </summary>
-    internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
+    internal sealed class GroupCommNetworkObserver : IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>
     {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver));
+        private static readonly Logger Logger = Logger.GetLogger(typeof(GroupCommNetworkObserver));
 
-        private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _commGroupHandlers;
+        private readonly IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> _networkService;
+
+        private readonly ConcurrentDictionary<string, TaskMessageObserver> _taskMessageObservers =
+            new ConcurrentDictionary<string, TaskMessageObserver>();
+
+        /// <summary>
+        /// A ConcurrentDictionary is used here since there is no ConcurrentSet implementation in C#, and ConcurrentBag
+        /// does not allow for us to check for the existence of an item. The byte is simply a placeholder.
+        /// </summary>
+        private readonly ConcurrentDictionary<string, byte> _registeredNodes = new ConcurrentDictionary<string, byte>();
 
         /// <summary>
         /// Creates a new GroupCommNetworkObserver.
         /// </summary>
         [Inject]
-        private GroupCommNetworkObserver()
+        private GroupCommNetworkObserver(
+            IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> networkService)
         {
-            _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
+            _networkService = networkService;
         }
 
         /// <summary>
-        /// Handles the incoming WritableNsMessage for this Task.
-        /// Delegates the GeneralGroupCommunicationMessage to the correct 
-        /// WritableCommunicationGroupNetworkObserver.
+        /// Registers a <see cref="TaskMessageObserver"/> for a given <see cref="taskSourceId"/>.
+        /// If the <see cref="TaskMessageObserver"/> has already been initialized, it will return
+        /// the existing one.
         /// </summary>
-        /// <param name="nsMessage"></param>
-        public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage)
+        public TaskMessageObserver RegisterAndGetForTask(string taskSourceId)
         {
-            if (nsMessage == null)
-            {
-                throw new ArgumentNullException("nsMessage");
-            }
-
-            try
-            {
-                GeneralGroupCommunicationMessage gcm = nsMessage.Data.First();
-                _commGroupHandlers[gcm.GroupName].OnNext(gcm);
-            }
-            catch (InvalidOperationException)
-            {
-                LOGGER.Log(Level.Error, "Group Communication Network Handler received message with no data");
-                throw;
-            }
-            catch (KeyNotFoundException)
-            {
-                LOGGER.Log(Level.Error, "Group Communication Network Handler received message for nonexistant group");
-                throw;
-            }
+            // Add a TaskMessage observer for each upstream/downstream source.
+            return _taskMessageObservers.GetOrAdd(taskSourceId, new TaskMessageObserver(_networkService.Get()));
         }
 
         /// <summary>
-        /// Registers the network handler for the given CommunicationGroup.
-        /// When messages are sent to the specified group name, the given handler
-        /// will be invoked with that message.
+        /// On the first message, we map the <see cref="TaskMessageObserver"/> to the <see cref="IPEndPoint"/>
+        /// of the sending Task and register the observer with <see cref="IRemoteManager{T}"/> 
+        /// by calling <see cref="TaskMessageObserver#OnNext"/>. On subsequent messages we simply ignore the message
+        /// and allow <see cref="ObserverContainer{T}"/> to send the message directly via the <see cref="IPEndPoint"/>.
         /// </summary>
-        /// <param name="groupName">The group name for the network handler</param>
-        /// <param name="commGroupHandler">The network handler to invoke when
-        /// messages are sent to the given group.</param>
-        public void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler)
+        /// <param name="remoteMessage"></param>
+        public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> remoteMessage)
         {
-            if (string.IsNullOrEmpty(groupName))
-            {
-                throw new ArgumentNullException("groupName");
-            }
-            if (commGroupHandler == null)
+            var nsMessage = remoteMessage.Message;
+            var gcm = nsMessage.Data.First();
+            var gcMessageTaskSource = gcm.Source;
+            TaskMessageObserver observer;
+            if (!_taskMessageObservers.TryGetValue(gcMessageTaskSource, out observer))
             {
-                throw new ArgumentNullException("commGroupHandler");
+                throw new KeyNotFoundException("Unable to find registered NodeMessageObserver for source Task " +
+                                                gcMessageTaskSource + ".");
             }
 
-            _commGroupHandlers[groupName] = commGroupHandler;
+            _registeredNodes.GetOrAdd(gcMessageTaskSource,
+                id =>
+                {
+                    observer.OnNext(remoteMessage);
+                    return new byte();
+                });
         }
 
         public void OnError(Exception error)
@@ -104,4 +106,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
new file mode 100644
index 0000000..00ca1d4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
@@ -0,0 +1,80 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// An observer for a node in a group communication graph.
+    /// </summary>
+    internal sealed class NodeMessageObserver<T> : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
+    {
+        private readonly NodeStruct<T> _nodeStruct;
+
+        internal NodeMessageObserver(NodeStruct<T> nodeStruct)
+        {
+            _nodeStruct = nodeStruct;
+        }
+
+        /// <summary>
+        /// Add data into the queue.
+        /// </summary>
+        /// <param name="value"></param>
+        public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value)
+        {
+            foreach (var data in value.Data)
+            {
+                var gcMessage = data as GroupCommunicationMessage<T>;
+                if (gcMessage != null && gcMessage.Data != null && gcMessage.Data.Length > 0)
+                {
+                    _nodeStruct.AddData(gcMessage);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Gets the group name of the node.
+        /// </summary>
+        public string GroupName
+        {
+            get { return _nodeStruct.GroupName; }
+        }
+
+        /// <summary>
+        /// Gets the operator name of the node.
+        /// </summary>
+        public string OperatorName
+        {
+            get { return _nodeStruct.OperatorName; }
+        }
+
+        public void OnError(Exception error)
+        {
+            // TODO[JIRA REEF-1407]: Cancel on queue of node and handle error in application layer.
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            // TODO[JIRA REEF-1407]: Complete adding on queue of node.
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
new file mode 100644
index 0000000..884959f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
@@ -0,0 +1,120 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// An identifier for a given node in the group communication graph.
+    /// A node is uniquely identifiable by a combination of its Task ID, 
+    /// <see cref="Type"/>, <see cref="GroupName"/>, and <see cref="OperatorName"/>.
+    /// </summary>
+    internal sealed class NodeObserverIdentifier
+    {
+        private readonly Type _type;
+        private readonly string _groupName;
+        private readonly string _operatorName;
+
+        /// <summary>
+        /// Creates a NodeObserverIdentifier from an observer.
+        /// </summary>
+        public static NodeObserverIdentifier FromObserver<T>(NodeMessageObserver<T> observer)
+        {
+            return new NodeObserverIdentifier(typeof(T), observer.GroupName, observer.OperatorName);
+        }
+
+        /// <summary>
+        /// Creates a NodeObserverIdentifier from a group communication message.
+        /// </summary>
+        public static NodeObserverIdentifier FromMessage(GeneralGroupCommunicationMessage message)
+        {
+            return new NodeObserverIdentifier(message.MessageType, message.GroupName, message.OperatorName);
+        }
+
+        private NodeObserverIdentifier(Type type, string groupName, string operatorName)
+        {
+            _type = type;
+            _groupName = groupName;
+            _operatorName = operatorName;
+        }
+
+        /// <summary>
+        /// The Type of the nodes messages.
+        /// </summary>
+        public Type Type
+        {
+            get { return _type; }
+        }
+
+        /// <summary>
+        /// The group name of the node.
+        /// </summary>
+        public string GroupName
+        {
+            get { return _groupName; }
+        }
+
+        /// <summary>
+        /// The operator name of the node.
+        /// </summary>
+        public string OperatorName
+        {
+            get { return _operatorName; }
+        }
+
+        /// <summary>
+        /// Overrides <see cref="Equals"/>. Simply compares equivalence of instance fields.
+        /// </summary>
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj))
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, obj))
+            {
+                return true;
+            }
+
+            return obj is NodeObserverIdentifier && Equals((NodeObserverIdentifier)obj);
+        }
+
+        /// <summary>
+        /// Overrides <see cref="GetHashCode"/>. Generates hashcode based on the instance fields.
+        /// </summary>
+        public override int GetHashCode()
+        {
+            int hash = 17;
+            hash = (hash * 31) + _type.GetHashCode();
+            hash = (hash * 31) + _groupName.GetHashCode();
+            return (hash * 31) + _operatorName.GetHashCode();
+        }
+
+        /// <summary>
+        /// Compare equality of instance fields.
+        /// </summary>
+        private bool Equals(NodeObserverIdentifier other)
+        {
+            return _type == other.Type &&
+                _groupName.Equals(other.GroupName) &&
+                _operatorName.Equals(other.OperatorName);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index e13d724..2140c61 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -33,9 +33,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Creates a new NodeStruct.
         /// </summary>
         /// <param name="id">The Task identifier</param>
-        internal NodeStruct(string id)
+        /// <param name="groupName">The group name of the node.</param>
+        /// <param name="operatorName">The operator name of the node</param>
+        internal NodeStruct(string id, string groupName, string operatorName)
         {
             Identifier = id;
+            GroupName = groupName;
+            OperatorName = operatorName;
             _messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>();
         }
 
@@ -46,6 +50,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         internal string Identifier { get; private set; }
 
         /// <summary>
+        /// The group name of the node.
+        /// </summary>
+        internal string GroupName { get; private set; }
+
+        /// <summary>
+        /// The operator name of the node.
+        /// </summary>
+        internal string OperatorName { get; private set; }
+
+        /// <summary>
         /// Gets the first message in the message queue.
         /// </summary>
         /// <returns>The first available message.</returns>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index bf63af4..66faa29 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Communication Group.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
+    public sealed class OperatorTopology<T> : IOperatorTopology<T>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
 
@@ -66,6 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="sleepTime">Sleep time between retry wating for registration</param>
         /// <param name="rootId">The identifier for the root Task in the topology graph</param>
         /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
+        /// <param name="networkObserver"></param>
         /// <param name="networkService">The network service</param>
         /// <param name="sender">The Sender used to do point to point communication</param>
         [Inject]
@@ -78,6 +79,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
+            GroupCommNetworkObserver networkObserver,
             StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
             Sender sender)
         {
@@ -89,12 +91,20 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _sleepTime = sleepTime;
             _nameClient = networkService.NamingClient;
             _sender = sender;
+            _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId, groupName, operatorName);
 
-            _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId);
+            // Register the observers for Task IDs and nodes adjacent to the current node
+            // in the group communication graph.
+            if (_parent != null)
+            {
+                networkObserver.RegisterAndGetForTask(_parent.Identifier).RegisterNodeObserver(new NodeMessageObserver<T>(_parent));
+            }
 
             foreach (var childId in childIds)
             {
-                _childNodeContainer.PutNode(new NodeStruct<T>(childId));
+                var childNode = new NodeStruct<T>(childId, groupName, operatorName);
+                _childNodeContainer.PutNode(childNode);
+                networkObserver.RegisterAndGetForTask(childId).RegisterNodeObserver(new NodeMessageObserver<T>(childNode));
             }
         }
 
@@ -123,41 +133,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         }
 
         /// <summary>
-        /// Handles the incoming GroupCommunicationMessage.
-        /// Updates the sending node's message queue.
-        /// </summary>
-        /// <param name="gcm">The incoming message</param>
-        public void OnNext(GeneralGroupCommunicationMessage gcm)
-        {
-            if (gcm == null)
-            {
-                throw new ArgumentNullException("gcm");
-            }
-            if (gcm.Source == null)
-            {
-                throw new ArgumentException("Message must have a source");
-            }
-
-            var sourceNode = (_parent != null && _parent.Identifier == gcm.Source) 
-                ? _parent 
-                : _childNodeContainer.GetChild(gcm.Source);
-
-            if (sourceNode == null)
-            {
-                throw new IllegalStateException("Received message from invalid task id: " + gcm.Source);
-            }
-
-            var message = gcm as GroupCommunicationMessage<T>;
-
-            if (message == null)
-            {
-                throw new NullReferenceException("message passed not of type GroupCommunicationMessage");
-            }
-
-            sourceNode.AddData(message);
-        }
-
-        /// <summary>
         /// Sends the message to the parent Task.
         /// </summary>
         /// <param name="message">The message to send</param>
@@ -308,14 +283,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren());
         }
 
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnCompleted()
-        {
-        }
-
         public bool HasChildren()
         {
             return _childNodeContainer.Count > 0;
@@ -396,4 +363,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
new file mode 100644
index 0000000..d8dd449
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
@@ -0,0 +1,134 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// The observer for a Task that multiplexes to the node observers associated with that Task.
+    /// </summary>
+    internal sealed class TaskMessageObserver :
+        IObserver<NsMessage<GeneralGroupCommunicationMessage>>, 
+        IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>
+    {
+        private readonly Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>> _observers =
+            new Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>>();
+
+        private readonly StreamingNetworkService<GeneralGroupCommunicationMessage> _networkService;
+        private readonly object _registrationLock = new object();
+        private bool _hasRegistered = false;
+        private volatile NsMessage<GeneralGroupCommunicationMessage> _registrationMessage;
+
+        public TaskMessageObserver(StreamingNetworkService<GeneralGroupCommunicationMessage> networkService)
+        {
+            _networkService = networkService;
+        }
+
+        /// <summary>
+        /// Registers a node associated with the Task.
+        /// </summary>
+        public void RegisterNodeObserver<T>(NodeMessageObserver<T> observer)
+        {
+            _observers.Add(NodeObserverIdentifier.FromObserver(observer), observer);
+        }
+
+        /// <summary>
+        /// This is called directly from the observer container with the registered IPEndpoint
+        /// of the Task ID.
+        /// </summary>
+        public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value)
+        {
+            Handle(value);
+        }
+
+        /// <summary>
+        /// This is called from the universal observer in ObserverContainer for the first message.
+        /// </summary>
+        public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> value)
+        {
+            // Lock to prevent duplication of messages.
+            lock (_registrationLock)
+            {
+                if (_hasRegistered)
+                {
+                    return;
+                }
+
+                var socketRemoteId = value.Identifier as SocketRemoteIdentifier;
+                if (socketRemoteId == null)
+                {
+                    throw new InvalidOperationException();
+                }
+
+                // Handle the message first, then register the observer.
+                Handle(value.Message, true);
+                _networkService.RemoteManager.RegisterObserver(socketRemoteId.Addr, this);
+                _hasRegistered = true;
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            // TODO[JIRA REEF-1407]: Propagate Exception to nodes associated with the Task.
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            // TODO[JIRA REEF-1407]: Propagate completion to nodes associated with the Task.
+            throw new NotImplementedException();
+        }
+
+        /// <summary>
+        /// Handles the group communication message.
+        /// </summary>
+        private void Handle(NsMessage<GeneralGroupCommunicationMessage> value, bool isRegistration = false)
+        {
+            // This is mainly used to handle the case should ObserverContainer
+            // decide to trigger handlers concurrently for a single message.
+            if (isRegistration)
+            {
+                // Process the registration message
+                _registrationMessage = value;
+            }
+            else if (_registrationMessage != null && value == _registrationMessage)
+            {
+                // This means that we've already processed the message.
+                // Ignore this message and discard the reference.
+                _registrationMessage = null;
+                return;
+            }
+
+            var gcMessage = value.Data.First();
+
+            IObserver<NsMessage<GeneralGroupCommunicationMessage>> observer;
+            if (!_observers.TryGetValue(NodeObserverIdentifier.FromMessage(gcMessage), out observer))
+            {
+                throw new InvalidOperationException();
+            }
+
+            observer.OnNext(value);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
index 9536d39..28208db 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
@@ -18,6 +18,7 @@
 using System;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Network.NetworkService
 {
@@ -33,6 +34,11 @@ namespace Org.Apache.REEF.Network.NetworkService
         INameClient NamingClient { get; }
 
         /// <summary>
+        /// The remote manager of the NetworkService.
+        /// </summary>
+        IRemoteManager<NsMessage<T>> RemoteManager { get; }
+
+            /// <summary>
         /// Open a new connection to the remote host registered to
         /// the name service with the given identifier
         /// </summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
index bd0c94b..53a74b4 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -81,6 +81,14 @@ namespace Org.Apache.REEF.Network.NetworkService
         public INameClient NamingClient { get; private set; }
 
         /// <summary>
+        /// The remote manager of the network service.
+        /// </summary>
+        public IRemoteManager<NsMessage<T>> RemoteManager
+        {
+            get { return _remoteManager; }
+        }
+
+        /// <summary>
         /// Open a new connection to the remote host registered to
         /// the name service with the given identifier
         /// </summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
index cfee235..a34e8cb 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -22,13 +22,10 @@ using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Network.NetworkService.Codec;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.StreamingCodec;
-using Org.Apache.REEF.Wake.Util;
 
 namespace Org.Apache.REEF.Network.NetworkService
 {
@@ -42,37 +39,78 @@ namespace Org.Apache.REEF.Network.NetworkService
 
         private readonly IRemoteManager<NsMessage<T>> _remoteManager;
         private IIdentifier _localIdentifier;
-        private readonly IDisposable _messageHandlerDisposable;
+        private readonly IDisposable _universalObserverDisposable;
+        private readonly IDisposable _remoteMessageUniversalObserver;
         private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
         private readonly INameClient _nameClient;
 
         /// <summary>
         /// Create a new Writable NetworkService.
         /// </summary>
-        /// <param name="messageHandler">The observer to handle incoming messages</param>
-        /// <param name="idFactory">The factory used to create IIdentifiers</param>
+        /// <param name="universalObserver">The observer to handle incoming messages</param>
         /// <param name="nameClient">The name client used to register Ids</param>
-        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a 
-        /// Writable RemoteManager</param>
+        /// <param name="remoteManagerFactory">
+        /// Writable RemoteManagerFactory to create a Writable RemoteManager
+        /// </param>
         /// <param name="codec">Codec for Network Service message</param>
         /// <param name="localAddressProvider">The local address provider</param>
-        /// <param name="injector">Fork of the injector that created the Network service</param>
         [Inject]
         private StreamingNetworkService(
-            IObserver<NsMessage<T>> messageHandler,
-            IIdentifierFactory idFactory,
+            IObserver<NsMessage<T>> universalObserver,
             INameClient nameClient,
             StreamingRemoteManagerFactory remoteManagerFactory,
             NsMessageStreamingCodec<T> codec,
-            ILocalAddressProvider localAddressProvider,
-            IInjector injector)
+            ILocalAddressProvider localAddressProvider)
+            : this(universalObserver, null, nameClient, remoteManagerFactory, codec, localAddressProvider)
+        {
+        }
+
+        /// <summary>
+        /// Create a new Writable NetworkService
+        /// </summary>
+        /// <param name="remoteMessageUniversalObserver">The observer to handle incoming messages</param>
+        /// <param name="nameClient">The name client used to register Ids</param>
+        /// <param name="remoteManagerFactory">
+        /// Writable RemoteManagerFactory to create a Writable RemoteManager
+        /// </param>
+        /// <param name="codec">Codec for Network Service message</param>
+        /// <param name="localAddressProvider">The local address provider</param>
+        [Inject]
+        private StreamingNetworkService(
+            IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver,
+            INameClient nameClient,
+            StreamingRemoteManagerFactory remoteManagerFactory,
+            NsMessageStreamingCodec<T> codec,
+            ILocalAddressProvider localAddressProvider)
+            : this(null, remoteMessageUniversalObserver, nameClient, remoteManagerFactory, codec, localAddressProvider)
+        {
+        }
+
+        [Inject]
+        private StreamingNetworkService(
+            IObserver<NsMessage<T>> universalObserver,
+            IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver,
+            INameClient nameClient,
+            StreamingRemoteManagerFactory remoteManagerFactory,
+            NsMessageStreamingCodec<T> codec,
+            ILocalAddressProvider localAddressProvider)
         {
             _remoteManager = remoteManagerFactory.GetInstance(localAddressProvider.LocalAddress, codec);
 
-            // Create and register incoming message handler
-            // TODO[REEF-419] This should use the TcpPortProvider mechanism
-            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
-            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+            if (universalObserver != null)
+            {
+                // Create and register incoming message handler
+                // TODO[REEF-419] This should use the TcpPortProvider mechanism
+                var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+                _universalObserverDisposable = _remoteManager.RegisterObserver(anyEndpoint, universalObserver);
+            }
+            else
+            {
+                _universalObserverDisposable = null;
+            }
+
+            _remoteMessageUniversalObserver = remoteMessageUniversalObserver != null ?
+                _remoteManager.RegisterObserver(remoteMessageUniversalObserver) : null;
 
             _nameClient = nameClient;
             _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
@@ -89,6 +127,14 @@ namespace Org.Apache.REEF.Network.NetworkService
         }
 
         /// <summary>
+        /// RemoteManager for registering Observers.
+        /// </summary>
+        public IRemoteManager<NsMessage<T>> RemoteManager
+        {
+            get { return _remoteManager; }
+        }
+
+        /// <summary>
         /// Open a new connection to the remote host registered to
         /// the name service with the given identifier
         /// </summary>
@@ -141,8 +187,18 @@ namespace Org.Apache.REEF.Network.NetworkService
             }
 
             NamingClient.Unregister(_localIdentifier.ToString());
+
             _localIdentifier = null;
-            _messageHandlerDisposable.Dispose();
+
+            if (_universalObserverDisposable != null)
+            {
+                _universalObserverDisposable.Dispose();
+            }
+
+            if (_remoteMessageUniversalObserver != null)
+            {
+                _remoteMessageUniversalObserver.Dispose();
+            }
         }
 
         /// <summary>
@@ -156,4 +212,4 @@ namespace Org.Apache.REEF.Network.NetworkService
             Logger.Log(Level.Verbose, "Disposed of network service");
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 6472597..c53027d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -87,6 +87,9 @@ under the License.
     <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
     <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
     <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />
+    <Compile Include="Group\Task\Impl\NodeMessageObserver.cs" />
+    <Compile Include="Group\Task\Impl\NodeObserverIdentifier.cs" />
+    <Compile Include="Group\Task\Impl\TaskMessageObserver.cs" />
     <Compile Include="Group\Task\IOperatorTopology.cs" />
     <Compile Include="Group\Operators\IReduceFunction.cs" />
     <Compile Include="Group\Operators\IReduceReceiver.cs" />
@@ -98,11 +101,8 @@ under the License.
     <Compile Include="Group\Pipelining\PipelineMessage.cs" />
     <Compile Include="Group\Pipelining\PipelineMessageCodec.cs" />
     <Compile Include="Group\Task\ICommunicationGroupClient.cs" />
-    <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" />
     <Compile Include="Group\Task\IGroupCommClient.cs" />
-    <Compile Include="Group\Task\IGroupCommNetworkObserver.cs" />
     <Compile Include="Group\Task\Impl\CommunicationGroupClient.cs" />
-    <Compile Include="Group\Task\Impl\CommunicationGroupNetworkObserver.cs" />
     <Compile Include="Group\Task\Impl\GroupCommClient.cs" />
     <Compile Include="Group\Task\Impl\GroupCommNetworkObserver.cs" />
     <Compile Include="Group\Task\Impl\NodeStruct.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
index 857e87c..cc4c57b 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
@@ -25,12 +25,13 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
     /// <summary>
     /// Stores registered IObservers for DefaultRemoteManager.
     /// Can register and look up IObservers by remote IPEndPoint.
+    /// TODO[JIRA REEF-1407]: Remove <see cref="IObserver{T}"/> and add custom OnError/OnCompleted with IPEndpoints.
     /// </summary>
     internal sealed class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>>
     {
         private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
-        private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
         private IObserver<T> _universalObserver;
+        private IObserver<IRemoteMessage<T>> _remoteMessageUniversalObserver;
 
         /// <summary>
         /// Constructs a new ObserverContainer used to manage remote IObservers.
@@ -38,7 +39,6 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         public ObserverContainer()
         {
             _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
-            _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
         }
 
         /// <summary>
@@ -67,8 +67,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <returns>An IDisposable used to unregister the observer with</returns>
         public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
         {
-            _typeMap[typeof(T)] = observer;
-            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+            _remoteMessageUniversalObserver = observer;
+            return Disposable.Create(() => _remoteMessageUniversalObserver = null);
         }
 
         /// <summary>
@@ -84,25 +84,26 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             T value = remoteEvent.Value;
             bool handled = false;
 
-            IObserver<T> observer1;
-            IObserver<IRemoteMessage<T>> observer2;
             if (_universalObserver != null)
             {
                 _universalObserver.OnNext(value);
                 handled = true;
             }
-            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
-            {
-                // IObserver was registered by IPEndpoint
-                observer1.OnNext(value);
-                handled = true;
-            } 
-            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+
+            if (_remoteMessageUniversalObserver != null)
             {
                 // IObserver was registered by event type
                 IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
                 IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
-                observer2.OnNext(remoteMessage);
+                _remoteMessageUniversalObserver.OnNext(remoteMessage);
+                handled = true;
+            }
+
+            IObserver<T> observer;
+            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer))
+            {
+                // IObserver was registered by IPEndpoint
+                observer.OnNext(value);
                 handled = true;
             }
 
@@ -114,10 +115,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
         public void OnError(Exception error)
         {
+            // TODO[JIRA REEF-1407]: Propagate Exception upwards. May need to change signature.
         }
 
         public void OnCompleted()
         {
+            // TODO[JIRA REEF-1407]: Propagate completion upwards. May need to change signature.
         }
     }
-}
+}
\ No newline at end of file