You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/07/02 00:45:16 UTC
reef git commit: [REEF-1453] GroupCommunication should create a new
observer for each Task
Repository: reef
Updated Branches:
refs/heads/master 4ab6a8d1b -> df1a226d6
[REEF-1453] GroupCommunication should create a new observer for each Task
This addressed the issue by
* Mapping Task observers direclty to the IPEndpoints of their clients.
* Adding a universal observer that distinguishes between the connecting IPEndpoint to ObserverContainer.
* Removing the type map in ObserverContainer.
* Restructuring the observer hierarchy such that the universal observer bootstraps the Task observers, which the node observers register with.
JIRA:
[REEF-1453](https://issues.apache.org/jira/browse/REEF-1453)
This closes #1059
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/df1a226d
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/df1a226d
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/df1a226d
Branch: refs/heads/master
Commit: df1a226d63c25b4e1de25964ea34a074e0ddb32f
Parents: 4ab6a8d
Author: Andrew Chung <af...@gmail.com>
Authored: Tue Jun 28 14:36:23 2016 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Fri Jul 1 16:33:55 2016 -0700
----------------------------------------------------------------------
.../Impl/GeneralGroupCommunicationMessage.cs | 31 +++--
.../Group/Driver/Impl/GroupCommDriver.cs | 3 +-
.../Driver/Impl/GroupCommunicationMessage.cs | 16 +--
.../Group/Operators/Impl/BroadcastReceiver.cs | 6 -
.../Group/Operators/Impl/BroadcastSender.cs | 6 -
.../Group/Operators/Impl/ReduceReceiver.cs | 7 -
.../Group/Operators/Impl/ReduceSender.cs | 4 -
.../Group/Operators/Impl/ScatterReceiver.cs | 8 +-
.../Group/Operators/Impl/ScatterSender.cs | 7 +-
.../Task/ICommunicationGroupNetworkObserver.cs | 43 ------
.../Group/Task/IGroupCommNetworkObserver.cs | 43 ------
.../Group/Task/Impl/CommunicationGroupClient.cs | 7 -
.../Impl/CommunicationGroupNetworkObserver.cs | 111 ---------------
.../Group/Task/Impl/GroupCommNetworkObserver.cs | 92 ++++++-------
.../Group/Task/Impl/NodeMessageObserver.cs | 80 +++++++++++
.../Group/Task/Impl/NodeObserverIdentifier.cs | 120 +++++++++++++++++
.../Group/Task/Impl/NodeStruct.cs | 16 ++-
.../Group/Task/Impl/OperatorTopology.cs | 61 ++-------
.../Group/Task/Impl/TaskMessageObserver.cs | 134 +++++++++++++++++++
.../NetworkService/INetworkService.cs | 6 +
.../NetworkService/NetworkService.cs | 8 ++
.../NetworkService/StreamingNetworkService.cs | 94 ++++++++++---
.../Org.Apache.REEF.Network.csproj | 6 +-
.../Remote/Impl/ObserverContainer.cs | 33 ++---
24 files changed, 544 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
index 7cce556..1009fc0 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -15,21 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+using System;
+
namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
/// <summary>
/// Messages sent by MPI Operators. This is the class inherited by
/// GroupCommunicationMessage but seen by Network Service
/// </summary>
- public class GeneralGroupCommunicationMessage
- {
- /// <summary>
- /// Empty constructor to allow instantiation by reflection
- /// </summary>
- protected GeneralGroupCommunicationMessage()
- {
- }
-
+ public abstract class GeneralGroupCommunicationMessage
+ {
/// <summary>
/// Create new CommunicationGroupMessage.
/// </summary>
@@ -37,36 +32,44 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// <param name="operatorName">The name of the MPI operator</param>
/// <param name="source">The message source</param>
/// <param name="destination">The message destination</param>
+ /// <param name="messageType">The type of the GC message</param>
protected GeneralGroupCommunicationMessage(
string groupName,
string operatorName,
string source,
- string destination)
+ string destination,
+ Type messageType)
{
GroupName = groupName;
OperatorName = operatorName;
Source = source;
Destination = destination;
+ MessageType = messageType;
}
/// <summary>
/// Returns the Communication Group name.
/// </summary>
- internal string GroupName { get; set; }
+ internal string GroupName { get; private set; }
/// <summary>
/// Returns the MPI Operator name.
/// </summary>
- internal string OperatorName { get; set; }
+ internal string OperatorName { get; private set; }
/// <summary>
/// Returns the source of the message.
/// </summary>
- internal string Source { get; set; }
+ internal string Source { get; private set; }
/// <summary>
/// Returns the destination of the message.
/// </summary>
- internal string Destination { get; set; }
+ internal string Destination { get; private set; }
+
+ /// <summary>
+ /// The Type of the GroupCommunicationMessage.
+ /// </summary>
+ internal Type MessageType { get; private set; }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index f49c38b..e636b04 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -34,6 +34,7 @@ using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Wake.Remote;
using ContextConfiguration = Org.Apache.REEF.Common.Context.ContextConfiguration;
namespace Org.Apache.REEF.Network.Group.Driver.Impl
@@ -196,7 +197,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
.BindImplementation(
- GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class,
+ GenericType<IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>>.Class,
GenericType<GroupCommNetworkObserver>.Class)
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index aba944c..2f01cef 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
-
namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
/// <summary>
@@ -25,14 +23,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
{
/// <summary>
- /// Empty constructor to allow instantiation by reflection
- /// </summary>
- [Inject]
- private GroupCommunicationMessage()
- {
- }
-
- /// <summary>
/// Create new CommunicationGroupMessage.
/// </summary>
/// <param name="groupName">The name of the communication group</param>
@@ -46,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
string source,
string destination,
T message)
- : base(groupName, operatorName, source, destination)
+ : base(groupName, operatorName, source, destination, typeof(T))
{
Data = new[] { message };
}
@@ -65,7 +55,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
string source,
string destination,
T[] message)
- : base(groupName, operatorName, source, destination)
+ : base(groupName, operatorName, source, destination, typeof(T))
{
Data = message;
}
@@ -76,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
internal T[] Data
{
get;
- set;
+ private set;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index 6152e05..bc72fea 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-using System.Reactive;
using System.Collections.Generic;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -48,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The node's topology graph</param>
- /// <param name="networkHandler">The incoming message handler</param>
/// <param name="dataConverter">The converter used to convert original message to pipelined ones and vice versa.</param>
[Inject]
private BroadcastReceiver(
@@ -56,7 +54,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
- ICommunicationGroupNetworkObserver networkHandler,
IPipelineDataConverter<T> dataConverter)
{
OperatorName = operatorName;
@@ -65,9 +62,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
PipelineDataConverter = dataConverter;
_topology = topology;
_initialize = initialize;
-
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 279dd33..aa75c6e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -16,7 +16,6 @@
// under the License.
using System;
-using System.Reactive;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Task;
@@ -49,7 +48,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The node's topology graph</param>
- /// <param name="networkHandler">The incoming message handler</param>
/// <param name="dataConverter">The converter used to convert original
/// message to pipelined ones and vice versa.</param>
[Inject]
@@ -58,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
- ICommunicationGroupNetworkObserver networkHandler,
IPipelineDataConverter<T> dataConverter)
{
_topology = topology;
@@ -67,9 +64,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
Version = PipelineVersion;
PipelineDataConverter = dataConverter;
_initialize = initialize;
-
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index d3a0102..e94c1ea 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-using System.Reactive;
using System.Collections.Generic;
using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Network.Group.Task.Impl;
using Org.Apache.REEF.Tang.Annotations;
@@ -49,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The task's operator topology graph</param>
- /// <param name="networkHandler">Handles incoming messages from other tasks</param>
/// <param name="reduceFunction">The class used to aggregate all incoming messages</param>
/// <param name="dataConverter">The converter used to convert original
/// message to pipelined ones and vice versa.</param>
@@ -59,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
- ICommunicationGroupNetworkObserver networkHandler,
IReduceFunction<T> reduceFunction,
IPipelineDataConverter<T> dataConverter)
{
@@ -72,9 +68,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
_pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
_topology = topology;
-
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index d2d1e5c..813db3e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -60,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
- ICommunicationGroupNetworkObserver networkHandler,
IReduceFunction<T> reduceFunction,
IPipelineDataConverter<T> dataConverter)
{
@@ -74,9 +73,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
_topology = topology;
_initialize = initialize;
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
-
PipelineDataConverter = dataConverter;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index ac8de01..d6fdfa1 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -17,7 +17,6 @@
using System.Collections.Generic;
using System.Linq;
-using System.Reactive;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Task;
@@ -47,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The task's operator topology graph</param>
- /// <param name="networkHandler">Handles incoming messages from other tasks</param>
[Inject]
private ScatterReceiver(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
- OperatorTopology<T> topology,
- ICommunicationGroupNetworkObserver networkHandler)
+ OperatorTopology<T> topology)
{
OperatorName = operatorName;
GroupName = groupName;
Version = DefaultVersion;
_topology = topology;
_initialize = initialize;
-
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 0f2cf83..440738b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -46,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
/// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The operator topology</param>
- /// <param name="networkHandler">The network handler</param>
[Inject]
private ScatterSender(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
- OperatorTopology<T> topology,
- ICommunicationGroupNetworkObserver networkHandler)
+ OperatorTopology<T> topology)
{
OperatorName = operatorName;
GroupName = groupName;
Version = DefaultVersion;
_topology = topology;
_initialize = initialize;
-
- var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message));
- networkHandler.Register(operatorName, msgHandler);
}
public string OperatorName { get; private set; }
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
deleted file mode 100644
index afa407e..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Task.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Group.Task
-{
- /// <summary>
- /// Handles incoming messages sent to this Communication Group.
- /// Writable Version
- /// </summary>
- [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
- internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
- {
- /// <summary>
- /// Registers the handler with the WritableCommunicationGroupNetworkObserver.
- /// Messages that are to be sent to the operator specified by operatorName
- /// are handled by the given observer.
- /// </summary>
- /// <param name="operatorName">The name of the operator whose handler
- /// will be invoked</param>
- /// <param name="observer">The handler to invoke when messages are sent
- /// to the operator specified by operatorName</param>
- void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
deleted file mode 100644
index 55d6942..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Task.Impl;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Group.Task
-{
- /// <summary>
- /// Handles all incoming messages for this Task.
- /// Writable Version
- /// </summary>
- [DefaultImplementation(typeof(GroupCommNetworkObserver))]
- internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
- {
- /// <summary>
- /// Registers the network handler for the given CommunicationGroup.
- /// When messages are sent to the specified group name, the given handler
- /// will be invoked with that message.
- /// </summary>
- /// <param name="groupName">The group name for the network handler</param>
- /// <param name="commGroupHandler">The network handler to invoke when
- /// messages are sent to the given group.</param>
- void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index aaeb98c..2fc90d9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -43,25 +43,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// </summary>
/// <param name="groupName">The name of the CommunicationGroup</param>
/// <param name="operatorConfigs">The serialized operator configurations</param>
- /// <param name="groupCommNetworkObserver">The handler for all incoming messages
- /// across all Communication Groups</param>
/// <param name="configSerializer">Used to deserialize operator configuration.</param>
- /// <param name="commGroupNetworkHandler">Communication group network observer that holds all the handlers for each operator.</param>
/// <param name="injector">injector forked from the injector that creates this instance</param>
[Inject]
private CommunicationGroupClient(
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs,
- IGroupCommNetworkObserver groupCommNetworkObserver,
AvroConfigurationSerializer configSerializer,
- ICommunicationGroupNetworkObserver commGroupNetworkHandler,
IInjector injector)
{
_operators = new Dictionary<string, object>();
GroupName = groupName;
- groupCommNetworkObserver.Register(groupName, commGroupNetworkHandler);
-
foreach (string operatorConfigStr in operatorConfigs)
{
IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr);
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
deleted file mode 100644
index 03d0dd4..0000000
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ /dev/null
@@ -1,111 +0,0 @@
-\ufeff// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Network.Group.Task.Impl
-{
- /// <summary>
- /// Handles incoming messages sent to this Communication Group.
- /// Writable version
- /// </summary>
- internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
- private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers;
-
- /// <summary>
- /// Creates a new CommunicationGroupNetworkObserver.
- /// </summary>
- [Inject]
- private CommunicationGroupNetworkObserver()
- {
- _handlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
- }
-
- /// <summary>
- /// Registers the handler with the CommunicationGroupNetworkObserver.
- /// Messages that are to be sent to the operator specified by operatorName
- /// are handled by the given observer.
- /// </summary>
- /// <param name="operatorName">The name of the operator whose handler
- /// will be invoked</param>
- /// <param name="observer">The writable handler to invoke when messages are sent
- /// to the operator specified by operatorName</param>
- void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
- {
- if (string.IsNullOrEmpty(operatorName))
- {
- throw new ArgumentNullException("operatorName");
- }
- if (observer == null)
- {
- throw new ArgumentNullException("observer");
- }
-
- _handlers[operatorName] = observer;
- }
-
- /// <summary>
- /// Handles the incoming GeneralGroupCommunicationMessage sent to this Communication Group.
- /// Looks for the operator that the message is being sent to and invoke its handler.
- /// </summary>
- /// <param name="message">The incoming message</param>
- public void OnNext(GeneralGroupCommunicationMessage message)
- {
- string operatorName = message.OperatorName;
-
- IObserver<GeneralGroupCommunicationMessage> handler = GetOperatorHandler(operatorName);
- if (handler == null)
- {
- Exceptions.Throw(new ArgumentException("No handler registered with the operator name: " + operatorName), LOGGER);
- }
- else
- {
- handler.OnNext(message);
- }
- }
-
- /// <summary>
- /// GetOperatorHandler for operatorName
- /// </summary>
- /// <param name="operatorName"></param>
- /// <returns></returns>
- private IObserver<GeneralGroupCommunicationMessage> GetOperatorHandler(string operatorName)
- {
- IObserver<GeneralGroupCommunicationMessage> handler;
- if (!_handlers.TryGetValue(operatorName, out handler))
- {
- Exceptions.Throw(new ApplicationException("No handler registered yet with the operator name: " + operatorName), LOGGER);
- }
- return handler;
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 47a4554..d82ddd2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -16,12 +16,18 @@
// under the License.
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Net;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
namespace Org.Apache.REEF.Network.Group.Task.Impl
{
@@ -29,71 +35,67 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Handles all incoming messages for this Task.
/// Writable version
/// </summary>
- internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
+ internal sealed class GroupCommNetworkObserver : IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>
{
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver));
+ private static readonly Logger Logger = Logger.GetLogger(typeof(GroupCommNetworkObserver));
- private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _commGroupHandlers;
+ private readonly IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> _networkService;
+
+ private readonly ConcurrentDictionary<string, TaskMessageObserver> _taskMessageObservers =
+ new ConcurrentDictionary<string, TaskMessageObserver>();
+
+ /// <summary>
+ /// A ConcurrentDictionary is used here since there is no ConcurrentSet implementation in C#, and ConcurrentBag
+ /// does not allow for us to check for the existence of an item. The byte is simply a placeholder.
+ /// </summary>
+ private readonly ConcurrentDictionary<string, byte> _registeredNodes = new ConcurrentDictionary<string, byte>();
/// <summary>
/// Creates a new GroupCommNetworkObserver.
/// </summary>
[Inject]
- private GroupCommNetworkObserver()
+ private GroupCommNetworkObserver(
+ IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> networkService)
{
- _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
+ _networkService = networkService;
}
/// <summary>
- /// Handles the incoming WritableNsMessage for this Task.
- /// Delegates the GeneralGroupCommunicationMessage to the correct
- /// WritableCommunicationGroupNetworkObserver.
+ /// Registers a <see cref="TaskMessageObserver"/> for a given <see cref="taskSourceId"/>.
+ /// If the <see cref="TaskMessageObserver"/> has already been initialized, it will return
+ /// the existing one.
/// </summary>
- /// <param name="nsMessage"></param>
- public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage)
+ public TaskMessageObserver RegisterAndGetForTask(string taskSourceId)
{
- if (nsMessage == null)
- {
- throw new ArgumentNullException("nsMessage");
- }
-
- try
- {
- GeneralGroupCommunicationMessage gcm = nsMessage.Data.First();
- _commGroupHandlers[gcm.GroupName].OnNext(gcm);
- }
- catch (InvalidOperationException)
- {
- LOGGER.Log(Level.Error, "Group Communication Network Handler received message with no data");
- throw;
- }
- catch (KeyNotFoundException)
- {
- LOGGER.Log(Level.Error, "Group Communication Network Handler received message for nonexistant group");
- throw;
- }
+ // Add a TaskMessage observer for each upstream/downstream source.
+ return _taskMessageObservers.GetOrAdd(taskSourceId, new TaskMessageObserver(_networkService.Get()));
}
/// <summary>
- /// Registers the network handler for the given CommunicationGroup.
- /// When messages are sent to the specified group name, the given handler
- /// will be invoked with that message.
+ /// On the first message, we map the <see cref="TaskMessageObserver"/> to the <see cref="IPEndPoint"/>
+ /// of the sending Task and register the observer with <see cref="IRemoteManager{T}"/>
+ /// by calling <see cref="TaskMessageObserver#OnNext"/>. On subsequent messages we simply ignore the message
+ /// and allow <see cref="ObserverContainer{T}"/> to send the message directly via the <see cref="IPEndPoint"/>.
/// </summary>
- /// <param name="groupName">The group name for the network handler</param>
- /// <param name="commGroupHandler">The network handler to invoke when
- /// messages are sent to the given group.</param>
- public void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler)
+ /// <param name="remoteMessage"></param>
+ public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> remoteMessage)
{
- if (string.IsNullOrEmpty(groupName))
- {
- throw new ArgumentNullException("groupName");
- }
- if (commGroupHandler == null)
+ var nsMessage = remoteMessage.Message;
+ var gcm = nsMessage.Data.First();
+ var gcMessageTaskSource = gcm.Source;
+ TaskMessageObserver observer;
+ if (!_taskMessageObservers.TryGetValue(gcMessageTaskSource, out observer))
{
- throw new ArgumentNullException("commGroupHandler");
+ throw new KeyNotFoundException("Unable to find registered NodeMessageObserver for source Task " +
+ gcMessageTaskSource + ".");
}
- _commGroupHandlers[groupName] = commGroupHandler;
+ _registeredNodes.GetOrAdd(gcMessageTaskSource,
+ id =>
+ {
+ observer.OnNext(remoteMessage);
+ return new byte();
+ });
}
public void OnError(Exception error)
@@ -104,4 +106,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
{
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
new file mode 100644
index 0000000..00ca1d4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs
@@ -0,0 +1,80 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// An observer for a node in a group communication graph.
+ /// </summary>
+ internal sealed class NodeMessageObserver<T> : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
+ {
+ private readonly NodeStruct<T> _nodeStruct;
+
+ internal NodeMessageObserver(NodeStruct<T> nodeStruct)
+ {
+ _nodeStruct = nodeStruct;
+ }
+
+ /// <summary>
+ /// Add data into the queue.
+ /// </summary>
+ /// <param name="value"></param>
+ public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value)
+ {
+ foreach (var data in value.Data)
+ {
+ var gcMessage = data as GroupCommunicationMessage<T>;
+ if (gcMessage != null && gcMessage.Data != null && gcMessage.Data.Length > 0)
+ {
+ _nodeStruct.AddData(gcMessage);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets the group name of the node.
+ /// </summary>
+ public string GroupName
+ {
+ get { return _nodeStruct.GroupName; }
+ }
+
+ /// <summary>
+ /// Gets the operator name of the node.
+ /// </summary>
+ public string OperatorName
+ {
+ get { return _nodeStruct.OperatorName; }
+ }
+
+ public void OnError(Exception error)
+ {
+ // TODO[JIRA REEF-1407]: Cancel on queue of node and handle error in application layer.
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ // TODO[JIRA REEF-1407]: Complete adding on queue of node.
+ throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
new file mode 100644
index 0000000..884959f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs
@@ -0,0 +1,120 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// An identifier for a given node in the group communication graph.
+ /// A node is uniquely identifiable by a combination of its Task ID,
+ /// <see cref="Type"/>, <see cref="GroupName"/>, and <see cref="OperatorName"/>.
+ /// </summary>
+ internal sealed class NodeObserverIdentifier
+ {
+ private readonly Type _type;
+ private readonly string _groupName;
+ private readonly string _operatorName;
+
+ /// <summary>
+ /// Creates a NodeObserverIdentifier from an observer.
+ /// </summary>
+ public static NodeObserverIdentifier FromObserver<T>(NodeMessageObserver<T> observer)
+ {
+ return new NodeObserverIdentifier(typeof(T), observer.GroupName, observer.OperatorName);
+ }
+
+ /// <summary>
+ /// Creates a NodeObserverIdentifier from a group communication message.
+ /// </summary>
+ public static NodeObserverIdentifier FromMessage(GeneralGroupCommunicationMessage message)
+ {
+ return new NodeObserverIdentifier(message.MessageType, message.GroupName, message.OperatorName);
+ }
+
+ private NodeObserverIdentifier(Type type, string groupName, string operatorName)
+ {
+ _type = type;
+ _groupName = groupName;
+ _operatorName = operatorName;
+ }
+
+ /// <summary>
+ /// The Type of the nodes messages.
+ /// </summary>
+ public Type Type
+ {
+ get { return _type; }
+ }
+
+ /// <summary>
+ /// The group name of the node.
+ /// </summary>
+ public string GroupName
+ {
+ get { return _groupName; }
+ }
+
+ /// <summary>
+ /// The operator name of the node.
+ /// </summary>
+ public string OperatorName
+ {
+ get { return _operatorName; }
+ }
+
+ /// <summary>
+ /// Overrides <see cref="Equals"/>. Simply compares equivalence of instance fields.
+ /// </summary>
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ return obj is NodeObserverIdentifier && Equals((NodeObserverIdentifier)obj);
+ }
+
+ /// <summary>
+ /// Overrides <see cref="GetHashCode"/>. Generates hashcode based on the instance fields.
+ /// </summary>
+ public override int GetHashCode()
+ {
+ int hash = 17;
+ hash = (hash * 31) + _type.GetHashCode();
+ hash = (hash * 31) + _groupName.GetHashCode();
+ return (hash * 31) + _operatorName.GetHashCode();
+ }
+
+ /// <summary>
+ /// Compare equality of instance fields.
+ /// </summary>
+ private bool Equals(NodeObserverIdentifier other)
+ {
+ return _type == other.Type &&
+ _groupName.Equals(other.GroupName) &&
+ _operatorName.Equals(other.OperatorName);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index e13d724..2140c61 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -33,9 +33,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Creates a new NodeStruct.
/// </summary>
/// <param name="id">The Task identifier</param>
- internal NodeStruct(string id)
+ /// <param name="groupName">The group name of the node.</param>
+ /// <param name="operatorName">The operator name of the node</param>
+ internal NodeStruct(string id, string groupName, string operatorName)
{
Identifier = id;
+ GroupName = groupName;
+ OperatorName = operatorName;
_messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>();
}
@@ -46,6 +50,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
internal string Identifier { get; private set; }
/// <summary>
+ /// The group name of the node.
+ /// </summary>
+ internal string GroupName { get; private set; }
+
+ /// <summary>
+ /// The operator name of the node.
+ /// </summary>
+ internal string OperatorName { get; private set; }
+
+ /// <summary>
/// Gets the first message in the message queue.
/// </summary>
/// <returns>The first available message.</returns>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index bf63af4..66faa29 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Communication Group.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
- public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
+ public sealed class OperatorTopology<T> : IOperatorTopology<T>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
@@ -66,6 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="sleepTime">Sleep time between retry wating for registration</param>
/// <param name="rootId">The identifier for the root Task in the topology graph</param>
/// <param name="childIds">The set of child Task identifiers in the topology graph</param>
+ /// <param name="networkObserver"></param>
/// <param name="networkService">The network service</param>
/// <param name="sender">The Sender used to do point to point communication</param>
[Inject]
@@ -78,6 +79,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
[Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
+ GroupCommNetworkObserver networkObserver,
StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
Sender sender)
{
@@ -89,12 +91,20 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
_sleepTime = sleepTime;
_nameClient = networkService.NamingClient;
_sender = sender;
+ _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId, groupName, operatorName);
- _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId);
+ // Register the observers for Task IDs and nodes adjacent to the current node
+ // in the group communication graph.
+ if (_parent != null)
+ {
+ networkObserver.RegisterAndGetForTask(_parent.Identifier).RegisterNodeObserver(new NodeMessageObserver<T>(_parent));
+ }
foreach (var childId in childIds)
{
- _childNodeContainer.PutNode(new NodeStruct<T>(childId));
+ var childNode = new NodeStruct<T>(childId, groupName, operatorName);
+ _childNodeContainer.PutNode(childNode);
+ networkObserver.RegisterAndGetForTask(childId).RegisterNodeObserver(new NodeMessageObserver<T>(childNode));
}
}
@@ -123,41 +133,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
}
/// <summary>
- /// Handles the incoming GroupCommunicationMessage.
- /// Updates the sending node's message queue.
- /// </summary>
- /// <param name="gcm">The incoming message</param>
- public void OnNext(GeneralGroupCommunicationMessage gcm)
- {
- if (gcm == null)
- {
- throw new ArgumentNullException("gcm");
- }
- if (gcm.Source == null)
- {
- throw new ArgumentException("Message must have a source");
- }
-
- var sourceNode = (_parent != null && _parent.Identifier == gcm.Source)
- ? _parent
- : _childNodeContainer.GetChild(gcm.Source);
-
- if (sourceNode == null)
- {
- throw new IllegalStateException("Received message from invalid task id: " + gcm.Source);
- }
-
- var message = gcm as GroupCommunicationMessage<T>;
-
- if (message == null)
- {
- throw new NullReferenceException("message passed not of type GroupCommunicationMessage");
- }
-
- sourceNode.AddData(message);
- }
-
- /// <summary>
/// Sends the message to the parent Task.
/// </summary>
/// <param name="message">The message to send</param>
@@ -308,14 +283,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren());
}
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
-
public bool HasChildren()
{
return _childNodeContainer.Count > 0;
@@ -396,4 +363,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
new file mode 100644
index 0000000..d8dd449
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs
@@ -0,0 +1,134 @@
+\ufeff// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+ /// <summary>
+ /// The observer for a Task that multiplexes to the node observers associated with that Task.
+ /// </summary>
+ internal sealed class TaskMessageObserver :
+ IObserver<NsMessage<GeneralGroupCommunicationMessage>>,
+ IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>
+ {
+ private readonly Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>> _observers =
+ new Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>>();
+
+ private readonly StreamingNetworkService<GeneralGroupCommunicationMessage> _networkService;
+ private readonly object _registrationLock = new object();
+ private bool _hasRegistered = false;
+ private volatile NsMessage<GeneralGroupCommunicationMessage> _registrationMessage;
+
+ public TaskMessageObserver(StreamingNetworkService<GeneralGroupCommunicationMessage> networkService)
+ {
+ _networkService = networkService;
+ }
+
+ /// <summary>
+ /// Registers a node associated with the Task.
+ /// </summary>
+ public void RegisterNodeObserver<T>(NodeMessageObserver<T> observer)
+ {
+ _observers.Add(NodeObserverIdentifier.FromObserver(observer), observer);
+ }
+
+ /// <summary>
+ /// This is called directly from the observer container with the registered IPEndpoint
+ /// of the Task ID.
+ /// </summary>
+ public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value)
+ {
+ Handle(value);
+ }
+
+ /// <summary>
+ /// This is called from the universal observer in ObserverContainer for the first message.
+ /// </summary>
+ public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> value)
+ {
+ // Lock to prevent duplication of messages.
+ lock (_registrationLock)
+ {
+ if (_hasRegistered)
+ {
+ return;
+ }
+
+ var socketRemoteId = value.Identifier as SocketRemoteIdentifier;
+ if (socketRemoteId == null)
+ {
+ throw new InvalidOperationException();
+ }
+
+ // Handle the message first, then register the observer.
+ Handle(value.Message, true);
+ _networkService.RemoteManager.RegisterObserver(socketRemoteId.Addr, this);
+ _hasRegistered = true;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ // TODO[JIRA REEF-1407]: Propagate Exception to nodes associated with the Task.
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ // TODO[JIRA REEF-1407]: Propagate completion to nodes associated with the Task.
+ throw new NotImplementedException();
+ }
+
+ /// <summary>
+ /// Handles the group communication message.
+ /// </summary>
+ private void Handle(NsMessage<GeneralGroupCommunicationMessage> value, bool isRegistration = false)
+ {
+ // This is mainly used to handle the case should ObserverContainer
+ // decide to trigger handlers concurrently for a single message.
+ if (isRegistration)
+ {
+ // Process the registration message
+ _registrationMessage = value;
+ }
+ else if (_registrationMessage != null && value == _registrationMessage)
+ {
+ // This means that we've already processed the message.
+ // Ignore this message and discard the reference.
+ _registrationMessage = null;
+ return;
+ }
+
+ var gcMessage = value.Data.First();
+
+ IObserver<NsMessage<GeneralGroupCommunicationMessage>> observer;
+ if (!_observers.TryGetValue(NodeObserverIdentifier.FromMessage(gcMessage), out observer))
+ {
+ throw new InvalidOperationException();
+ }
+
+ observer.OnNext(value);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
index 9536d39..28208db 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
@@ -18,6 +18,7 @@
using System;
using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
namespace Org.Apache.REEF.Network.NetworkService
{
@@ -33,6 +34,11 @@ namespace Org.Apache.REEF.Network.NetworkService
INameClient NamingClient { get; }
/// <summary>
+ /// The remote manager of the NetworkService.
+ /// </summary>
+ IRemoteManager<NsMessage<T>> RemoteManager { get; }
+
+ /// <summary>
/// Open a new connection to the remote host registered to
/// the name service with the given identifier
/// </summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
index bd0c94b..53a74b4 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -81,6 +81,14 @@ namespace Org.Apache.REEF.Network.NetworkService
public INameClient NamingClient { get; private set; }
/// <summary>
+ /// The remote manager of the network service.
+ /// </summary>
+ public IRemoteManager<NsMessage<T>> RemoteManager
+ {
+ get { return _remoteManager; }
+ }
+
+ /// <summary>
/// Open a new connection to the remote host registered to
/// the name service with the given identifier
/// </summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
index cfee235..a34e8cb 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -22,13 +22,10 @@ using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Network.NetworkService.Codec;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.StreamingCodec;
-using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Network.NetworkService
{
@@ -42,37 +39,78 @@ namespace Org.Apache.REEF.Network.NetworkService
private readonly IRemoteManager<NsMessage<T>> _remoteManager;
private IIdentifier _localIdentifier;
- private readonly IDisposable _messageHandlerDisposable;
+ private readonly IDisposable _universalObserverDisposable;
+ private readonly IDisposable _remoteMessageUniversalObserver;
private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
private readonly INameClient _nameClient;
/// <summary>
/// Create a new Writable NetworkService.
/// </summary>
- /// <param name="messageHandler">The observer to handle incoming messages</param>
- /// <param name="idFactory">The factory used to create IIdentifiers</param>
+ /// <param name="universalObserver">The observer to handle incoming messages</param>
/// <param name="nameClient">The name client used to register Ids</param>
- /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a
- /// Writable RemoteManager</param>
+ /// <param name="remoteManagerFactory">
+ /// Writable RemoteManagerFactory to create a Writable RemoteManager
+ /// </param>
/// <param name="codec">Codec for Network Service message</param>
/// <param name="localAddressProvider">The local address provider</param>
- /// <param name="injector">Fork of the injector that created the Network service</param>
[Inject]
private StreamingNetworkService(
- IObserver<NsMessage<T>> messageHandler,
- IIdentifierFactory idFactory,
+ IObserver<NsMessage<T>> universalObserver,
INameClient nameClient,
StreamingRemoteManagerFactory remoteManagerFactory,
NsMessageStreamingCodec<T> codec,
- ILocalAddressProvider localAddressProvider,
- IInjector injector)
+ ILocalAddressProvider localAddressProvider)
+ : this(universalObserver, null, nameClient, remoteManagerFactory, codec, localAddressProvider)
+ {
+ }
+
+ /// <summary>
+ /// Create a new Writable NetworkService
+ /// </summary>
+ /// <param name="remoteMessageUniversalObserver">The observer to handle incoming messages</param>
+ /// <param name="nameClient">The name client used to register Ids</param>
+ /// <param name="remoteManagerFactory">
+ /// Writable RemoteManagerFactory to create a Writable RemoteManager
+ /// </param>
+ /// <param name="codec">Codec for Network Service message</param>
+ /// <param name="localAddressProvider">The local address provider</param>
+ [Inject]
+ private StreamingNetworkService(
+ IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver,
+ INameClient nameClient,
+ StreamingRemoteManagerFactory remoteManagerFactory,
+ NsMessageStreamingCodec<T> codec,
+ ILocalAddressProvider localAddressProvider)
+ : this(null, remoteMessageUniversalObserver, nameClient, remoteManagerFactory, codec, localAddressProvider)
+ {
+ }
+
+ [Inject]
+ private StreamingNetworkService(
+ IObserver<NsMessage<T>> universalObserver,
+ IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver,
+ INameClient nameClient,
+ StreamingRemoteManagerFactory remoteManagerFactory,
+ NsMessageStreamingCodec<T> codec,
+ ILocalAddressProvider localAddressProvider)
{
_remoteManager = remoteManagerFactory.GetInstance(localAddressProvider.LocalAddress, codec);
- // Create and register incoming message handler
- // TODO[REEF-419] This should use the TcpPortProvider mechanism
- var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
- _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+ if (universalObserver != null)
+ {
+ // Create and register incoming message handler
+ // TODO[REEF-419] This should use the TcpPortProvider mechanism
+ var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+ _universalObserverDisposable = _remoteManager.RegisterObserver(anyEndpoint, universalObserver);
+ }
+ else
+ {
+ _universalObserverDisposable = null;
+ }
+
+ _remoteMessageUniversalObserver = remoteMessageUniversalObserver != null ?
+ _remoteManager.RegisterObserver(remoteMessageUniversalObserver) : null;
_nameClient = nameClient;
_connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
@@ -89,6 +127,14 @@ namespace Org.Apache.REEF.Network.NetworkService
}
/// <summary>
+ /// RemoteManager for registering Observers.
+ /// </summary>
+ public IRemoteManager<NsMessage<T>> RemoteManager
+ {
+ get { return _remoteManager; }
+ }
+
+ /// <summary>
/// Open a new connection to the remote host registered to
/// the name service with the given identifier
/// </summary>
@@ -141,8 +187,18 @@ namespace Org.Apache.REEF.Network.NetworkService
}
NamingClient.Unregister(_localIdentifier.ToString());
+
_localIdentifier = null;
- _messageHandlerDisposable.Dispose();
+
+ if (_universalObserverDisposable != null)
+ {
+ _universalObserverDisposable.Dispose();
+ }
+
+ if (_remoteMessageUniversalObserver != null)
+ {
+ _remoteMessageUniversalObserver.Dispose();
+ }
}
/// <summary>
@@ -156,4 +212,4 @@ namespace Org.Apache.REEF.Network.NetworkService
Logger.Log(Level.Verbose, "Disposed of network service");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 6472597..c53027d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -87,6 +87,9 @@ under the License.
<Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
<Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
<Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />
+ <Compile Include="Group\Task\Impl\NodeMessageObserver.cs" />
+ <Compile Include="Group\Task\Impl\NodeObserverIdentifier.cs" />
+ <Compile Include="Group\Task\Impl\TaskMessageObserver.cs" />
<Compile Include="Group\Task\IOperatorTopology.cs" />
<Compile Include="Group\Operators\IReduceFunction.cs" />
<Compile Include="Group\Operators\IReduceReceiver.cs" />
@@ -98,11 +101,8 @@ under the License.
<Compile Include="Group\Pipelining\PipelineMessage.cs" />
<Compile Include="Group\Pipelining\PipelineMessageCodec.cs" />
<Compile Include="Group\Task\ICommunicationGroupClient.cs" />
- <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" />
<Compile Include="Group\Task\IGroupCommClient.cs" />
- <Compile Include="Group\Task\IGroupCommNetworkObserver.cs" />
<Compile Include="Group\Task\Impl\CommunicationGroupClient.cs" />
- <Compile Include="Group\Task\Impl\CommunicationGroupNetworkObserver.cs" />
<Compile Include="Group\Task\Impl\GroupCommClient.cs" />
<Compile Include="Group\Task\Impl\GroupCommNetworkObserver.cs" />
<Compile Include="Group\Task\Impl\NodeStruct.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
index 857e87c..cc4c57b 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
@@ -25,12 +25,13 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <summary>
/// Stores registered IObservers for DefaultRemoteManager.
/// Can register and look up IObservers by remote IPEndPoint.
+ /// TODO[JIRA REEF-1407]: Remove <see cref="IObserver{T}"/> and add custom OnError/OnCompleted with IPEndpoints.
/// </summary>
internal sealed class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>>
{
private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
- private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
private IObserver<T> _universalObserver;
+ private IObserver<IRemoteMessage<T>> _remoteMessageUniversalObserver;
/// <summary>
/// Constructs a new ObserverContainer used to manage remote IObservers.
@@ -38,7 +39,6 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
public ObserverContainer()
{
_endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
- _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
}
/// <summary>
@@ -67,8 +67,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <returns>An IDisposable used to unregister the observer with</returns>
public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
{
- _typeMap[typeof(T)] = observer;
- return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+ _remoteMessageUniversalObserver = observer;
+ return Disposable.Create(() => _remoteMessageUniversalObserver = null);
}
/// <summary>
@@ -84,25 +84,26 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
T value = remoteEvent.Value;
bool handled = false;
- IObserver<T> observer1;
- IObserver<IRemoteMessage<T>> observer2;
if (_universalObserver != null)
{
_universalObserver.OnNext(value);
handled = true;
}
- if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
- {
- // IObserver was registered by IPEndpoint
- observer1.OnNext(value);
- handled = true;
- }
- else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+
+ if (_remoteMessageUniversalObserver != null)
{
// IObserver was registered by event type
IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
- observer2.OnNext(remoteMessage);
+ _remoteMessageUniversalObserver.OnNext(remoteMessage);
+ handled = true;
+ }
+
+ IObserver<T> observer;
+ if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer))
+ {
+ // IObserver was registered by IPEndpoint
+ observer.OnNext(value);
handled = true;
}
@@ -114,10 +115,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
public void OnError(Exception error)
{
+ // TODO[JIRA REEF-1407]: Propagate Exception upwards. May need to change signature.
}
public void OnCompleted()
{
+ // TODO[JIRA REEF-1407]: Propagate completion upwards. May need to change signature.
}
}
-}
+}
\ No newline at end of file