You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/05/22 02:59:39 UTC
incubator-reef git commit: [REEF-329] Improve Injection Communication
Group
Repository: incubator-reef
Updated Branches:
refs/heads/master b9686cd79 -> 75f25a267
[REEF-329] Improve Injection Communication Group
This PR is to improve injection in Communication Group:
* Use Fork Injector in creating CommunicationGroupClient
* Use Fork Injector in Creating GroupCommOperators
* Use configuration data bound at driver side to inject GroupCommOperators at
task side instead of letting clients to decide what operator instance to
inject
* Resolved race condition issue caused by delayed message handler registration
JIRA:
[REEF-329](https://issues.apache.org/jira/browse/REEF-329)
Pull Request:
This closes #187
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/75f25a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/75f25a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/75f25a26
Branch: refs/heads/master
Commit: 75f25a267ae9e597944e79c79e179e5b544d980b
Parents: b9686cd
Author: Julia Wang <jw...@yahoo.com>
Authored: Fri May 8 18:37:27 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 21 17:57:40 2015 -0700
----------------------------------------------------------------------
.../GroupCommunicationTests.cs | 204 ++++++++++++++++++-
.../Org.Apache.REEF.Network.Tests.csproj | 14 +-
.../Config/GroupCommConfigurationOptions.cs | 10 +
.../Driver/Impl/CommunicationGroupDriver.cs | 3 -
.../Group/Operators/Impl/BroadcastReceiver.cs | 26 +--
.../Group/Operators/Impl/BroadcastSender.cs | 17 +-
.../Group/Operators/Impl/ReduceReceiver.cs | 18 +-
.../Group/Operators/Impl/ReduceSender.cs | 13 +-
.../Group/Operators/Impl/ScatterReceiver.cs | 17 +-
.../Group/Operators/Impl/ScatterSender.cs | 18 +-
.../Group/Task/Impl/CommunicationGroupClient.cs | 78 ++-----
.../Impl/CommunicationGroupNetworkObserver.cs | 35 +---
.../Group/Task/Impl/GroupCommClient.cs | 27 +--
.../Group/Topology/FlatTopology.cs | 17 ++
.../Group/Topology/TreeTopology.cs | 18 +-
.../Injection/TestInjection.cs | 82 ++++++++
16 files changed, 437 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 7a6b5c1..53b8cb5 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -20,6 +20,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Globalization;
using System.Linq;
using System.Net;
using System.Reactive;
@@ -29,12 +30,14 @@ using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Examples.MachineLearning.KMeans;
using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
using Org.Apache.REEF.Network.Group.Codec;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Network.Group.Topology;
@@ -107,7 +110,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
int fanOut = 2;
var groupCommunicationDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
ICommunicationGroupDriver commGroup = groupCommunicationDriver.DefaultGroup
.AddBroadcast<int>(
broadcastOperatorName,
@@ -155,7 +157,76 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
Assert.AreEqual(sum, expected);
}
}
-
+
+ /// <summary>
+ /// This is to test operator injection in CommunicationGroupClient with int[] as message type
+ /// </summary>
+ [TestMethod]
+ public void TestGetBroadcastReduceOperatorsForIntArrayMessageType()
+ {
+ const string groupName = "group1";
+ const string broadcastOperatorName = "broadcast";
+ const string reduceOperatorName = "reduce";
+ const string masterTaskId = "task0";
+ const string driverId = "Driver Id";
+ const int numTasks = 3;
+ const int fanOut = 2;
+
+ IConfiguration codecConfig = CodecConfiguration<int[]>.Conf
+ .Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class)
+ .Build();
+
+ IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
+ .Set(ReduceFunctionConfiguration<int[]>.ReduceFunction, GenericType<ArraySumFunction>.Class)
+ .Build();
+
+ IConfiguration dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ PipelineDataConverterConfiguration<int[]>.Conf
+ .Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
+ GenericType<PipelineIntDataConverter>.Class)
+ .Build())
+ .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+ GenericType<GroupTestConfig.ChunkSize>.Class,
+ GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ var groupCommunicationDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+ ICommunicationGroupDriver commGroup = groupCommunicationDriver.DefaultGroup
+ .AddBroadcast<int[]>(
+ broadcastOperatorName,
+ masterTaskId,
+ TopologyTypes.Flat,
+ codecConfig,
+ dataConverterConfig)
+ .AddReduce<int[]>(
+ reduceOperatorName,
+ masterTaskId,
+ TopologyTypes.Flat,
+ codecConfig,
+ dataConverterConfig,
+ reduceFunctionConfig)
+ .Build();
+
+ var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup);
+
+ //for master task
+ IBroadcastSender<int[]> broadcastSender = commGroups[0].GetBroadcastSender<int[]>(broadcastOperatorName);
+ IReduceReceiver<int[]> sumReducer = commGroups[0].GetReduceReceiver<int[]>(reduceOperatorName);
+
+ IBroadcastReceiver<int[]> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int[]>(broadcastOperatorName);
+ IReduceSender<int[]> triangleNumberSender1 = commGroups[1].GetReduceSender<int[]>(reduceOperatorName);
+
+ IBroadcastReceiver<int[]> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int[]>(broadcastOperatorName);
+ IReduceSender<int[]> triangleNumberSender2 = commGroups[2].GetReduceSender<int[]>(reduceOperatorName);
+
+ Assert.IsNotNull(broadcastSender);
+ Assert.IsNotNull(sumReducer);
+ Assert.IsNotNull(broadcastReceiver1);
+ Assert.IsNotNull(triangleNumberSender1);
+ Assert.IsNotNull(broadcastReceiver2);
+ Assert.IsNotNull(triangleNumberSender2);
+ }
+
[TestMethod]
public void TestScatterReduceOperators()
{
@@ -710,7 +781,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
return groupCommDriver;
}
- public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IGroupCommDriver groupCommDriver, ICommunicationGroupDriver commGroup)
+ public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IGroupCommDriver groupCommDriver, ICommunicationGroupDriver commGroupDriver)
{
List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>();
IConfiguration serviceConfig = groupCommDriver.GetServiceConfiguration();
@@ -725,17 +796,24 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
.Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
.Build())
.Build();
- commGroup.AddTask(taskId);
+ commGroupDriver.AddTask(taskId);
partialConfigs.Add(partialTaskConfig);
}
for (int i = 0; i < numTasks; i++)
{
+ //get task configuration at driver side
string taskId = "task" + i;
IConfiguration groupCommTaskConfig = groupCommDriver.GetGroupCommTaskConfiguration(taskId);
IConfiguration mergedConf = Configurations.Merge(groupCommTaskConfig, partialConfigs[i], serviceConfig);
- IInjector injector = TangFactory.GetTang().NewInjector(mergedConf);
+ var conf = TangFactory.GetTang()
+ .NewConfigurationBuilder(mergedConf)
+ .BindNamedParameter(typeof(GroupCommConfigurationOptions.Initialize), "false")
+ .Build();
+ IInjector injector = TangFactory.GetTang().NewInjector(conf);
+
+ //simulate injection at evaluator side
IGroupCommClient groupCommClient = injector.GetInstance<IGroupCommClient>();
commGroups.Add(groupCommClient.GetCommunicationGroup(groupName));
}
@@ -821,4 +899,120 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
throw new NotImplementedException();
}
}
+
+ class ArraySumFunction : IReduceFunction<int[]>
+ {
+ [Inject]
+ private ArraySumFunction()
+ {
+ }
+
+ public int[] Reduce(IEnumerable<int[]> elements)
+ {
+ int[] result = null;
+ int count = 0;
+
+ foreach (var element in elements)
+ {
+ if (count == 0)
+ {
+ result = element.Clone() as int[];
+ }
+ else
+ {
+ if (element.Length != result.Length)
+ {
+ throw new Exception("integer arrays are of different sizes");
+ }
+
+ for (int i = 0; i < result.Length; i++)
+ {
+ result[i] += element[i];
+ }
+ }
+ count++;
+ }
+
+ return result;
+ }
+ }
+
+ class IntArrayCodec : ICodec<int[]>
+ {
+ [Inject]
+ private IntArrayCodec()
+ {
+ }
+
+ public byte[] Encode(int[] obj)
+ {
+ byte[] result = new byte[sizeof(Int32) * obj.Length];
+ Buffer.BlockCopy(obj, 0, result, 0, result.Length);
+ return result;
+ }
+
+ public int[] Decode(byte[] data)
+ {
+ if (data.Length % sizeof(Int32) != 0)
+ {
+ throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size");
+ }
+
+ int[] result = new int[data.Length / sizeof(Int32)];
+ Buffer.BlockCopy(data, 0, result, 0, data.Length);
+ return result;
+ }
+ }
+
+ class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+ {
+ readonly int _chunkSize;
+
+ [Inject]
+ private PipelineIntDataConverter([Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize)
+ {
+ _chunkSize = chunkSize;
+ }
+
+ public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
+ {
+ List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>();
+ int totalChunks = message.Length / _chunkSize;
+
+ if (message.Length % _chunkSize != 0)
+ {
+ totalChunks++;
+ }
+
+ int counter = 0;
+ for (int i = 0; i < message.Length; i += _chunkSize)
+ {
+ int[] data = new int[Math.Min(_chunkSize, message.Length - i)];
+ Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int));
+
+ messageList.Add(counter == totalChunks - 1
+ ? new PipelineMessage<int[]>(data, true)
+ : new PipelineMessage<int[]>(data, false));
+
+ counter++;
+ }
+
+ return messageList;
+ }
+
+ public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage)
+ {
+ int size = pipelineMessage.Select(x => x.Data.Length).Sum();
+ int[] data = new int[size];
+ int offset = 0;
+
+ foreach (var message in pipelineMessage)
+ {
+ Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int));
+ offset += message.Data.Length * sizeof(int);
+ }
+
+ return data;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index cb2019b..eebfdaf 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -59,23 +59,27 @@ under the License.
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
<Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
<Name>Org.Apache.REEF.Common</Name>
</ProjectReference>
- <ProjectReference Include="..\Org.Apache.REEF.Examples\Org.Apache.REEF.Examples.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Examples\Org.Apache.REEF.Examples.csproj">
<Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
<Name>Org.Apache.REEF.Examples</Name>
</ProjectReference>
- <ProjectReference Include="..\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network.Examples\Org.Apache.REEF.Network.Examples.csproj">
+ <Project>{b1b43b60-ddd0-4805-a9b4-ba84a0ccb7c7}</Project>
+ <Name>Org.Apache.REEF.Network.Examples</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
<Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
<Name>Org.Apache.REEF.Network</Name>
</ProjectReference>
- <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
<Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
<Name>Org.Apache.REEF.Tang</Name>
</ProjectReference>
- <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
<Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
<Name>Org.Apache.REEF.Wake</Name>
</ProjectReference>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
index f86c546..50c0dd6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
@@ -98,5 +98,15 @@ namespace Org.Apache.REEF.Network.Group.Config
public class TopologyChildTaskIds : Name<ISet<string>>
{
}
+
+ [NamedParameter("Type of the message")]
+ public class MessageType : Name<string>
+ {
+ }
+
+ [NamedParameter("Wether or not to call topology initialize", defaultValue: "true")]
+ public class Initialize : Name<bool>
+ {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 89238f8..07581ba 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -316,9 +316,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
var innerConf =
TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId))
- .BindNamedParameter<GroupCommConfigurationOptions.DriverId, string>(
- GenericType<GroupCommConfigurationOptions.DriverId>.Class,
- _driverId)
.BindNamedParameter<GroupCommConfigurationOptions.OperatorName, string>(
GenericType<GroupCommConfigurationOptions.OperatorName>.Class,
operatorName)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index 75ab88e..65ed6b9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -36,23 +36,24 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
public class BroadcastReceiver<T> : IBroadcastReceiver<T>
{
private const int PipelineVersion = 2;
- private readonly ICommunicationGroupNetworkObserver _networkHandler;
private readonly OperatorTopology<PipelineMessage<T>> _topology;
private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastReceiver<T>));
+
/// <summary>
/// Creates a new BroadcastReceiver.
/// </summary>
/// <param name="operatorName">The operator identifier</param>
- /// <param name="groupName">The name of the CommunicationGroup that the
- /// operator belongs to</param>
+ /// <param name="groupName">The name of the CommunicationGroup that the operator belongs to</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The node's topology graph</param>
/// <param name="networkHandler">The incoming message handler</param>
- /// <param name="dataConverter">The converter used to convert original
- /// message to pipelined ones and vice versa.</param>
+ /// <param name="dataConverter">The converter used to convert original message to pipelined ones and vice versa.</param>
[Inject]
- public BroadcastReceiver(
+ private BroadcastReceiver(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
ICommunicationGroupNetworkObserver networkHandler,
IPipelineDataConverter<T> dataConverter)
@@ -60,15 +61,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
OperatorName = operatorName;
GroupName = groupName;
Version = PipelineVersion;
-
- _networkHandler = networkHandler;
+ PipelineDataConverter = dataConverter;
_topology = topology;
- _topology.Initialize();
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
- _networkHandler.Register(operatorName, msgHandler);
+ networkHandler.Register(operatorName, msgHandler);
- PipelineDataConverter = dataConverter;
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
/// <summary>
@@ -91,7 +93,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// </summary>
public IPipelineDataConverter<T> PipelineDataConverter { get; private set; }
-
/// <summary>
/// Receive a message from parent BroadcastSender.
/// </summary>
@@ -115,6 +116,5 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
return PipelineDataConverter.FullMessage(messageList);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index 21701ea..34f9cd2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -45,31 +45,36 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="operatorName">The identifier for the operator</param>
/// <param name="groupName">The name of the CommunicationGroup that the operator
/// belongs to</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The node's topology graph</param>
/// <param name="networkHandler">The incoming message handler</param>
/// <param name="dataConverter">The converter used to convert original
/// message to pipelined ones and vice versa.</param>
[Inject]
- public BroadcastSender(
+ private BroadcastSender(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
ICommunicationGroupNetworkObserver networkHandler,
IPipelineDataConverter<T> dataConverter)
{
+ _topology = topology;
OperatorName = operatorName;
GroupName = groupName;
Version = PipelineVersion;
-
- _topology = topology;
- _topology.Initialize();
+ PipelineDataConverter = dataConverter;
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
networkHandler.Register(operatorName, msgHandler);
- PipelineDataConverter = dataConverter;
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
-
+
/// <summary>
/// Returns the identifier for the Group Communication operator.
/// </summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index 2242368..0c2fd94 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <typeparam name="T">The message type</typeparam>
public class ReduceReceiver<T> : IReduceReceiver<T>
{
- private static readonly Logger Logger = Logger.GetLogger(typeof (ReduceReceiver<T>));
+ private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceReceiver<T>));
private const int PipelineVersion = 2;
private readonly OperatorTopology<PipelineMessage<T>> _topology;
private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
@@ -45,15 +45,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// </summary>
/// <param name="operatorName">The name of the reduce operator</param>
/// <param name="groupName">The name of the operator's CommunicationGroup</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The task's operator topology graph</param>
/// <param name="networkHandler">Handles incoming messages from other tasks</param>
/// <param name="reduceFunction">The class used to aggregate all incoming messages</param>
/// <param name="dataConverter">The converter used to convert original
/// message to pipelined ones and vice versa.</param>
[Inject]
- public ReduceReceiver(
- [Parameter(typeof (GroupCommConfigurationOptions.OperatorName))] string operatorName,
- [Parameter(typeof (GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ private ReduceReceiver(
+ [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
+ [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
ICommunicationGroupNetworkObserver networkHandler,
IReduceFunction<T> reduceFunction,
@@ -63,15 +66,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
GroupName = groupName;
Version = PipelineVersion;
ReduceFunction = reduceFunction;
+ PipelineDataConverter = dataConverter;
_pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
_topology = topology;
- _topology.Initialize();
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
networkHandler.Register(operatorName, msgHandler);
- PipelineDataConverter = dataConverter;
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index d61657f..cd2049b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -36,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <typeparam name="T">The message type</typeparam>
public class ReduceSender<T> : IReduceSender<T>
{
- private static readonly Logger Logger = Logger.GetLogger(typeof (ReduceSender<T>));
+ private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceSender<T>));
private const int PipelineVersion = 2;
private readonly OperatorTopology<PipelineMessage<T>> _topology;
private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
@@ -46,15 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// </summary>
/// <param name="operatorName">The name of the reduce operator</param>
/// <param name="groupName">The name of the reduce operator's CommunicationGroup</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The Task's operator topology graph</param>
/// <param name="networkHandler">The handler used to handle incoming messages</param>
/// <param name="reduceFunction">The function used to reduce the incoming messages</param>
/// <param name="dataConverter">The converter used to convert original
/// message to pipelined ones and vice versa.</param>
[Inject]
- public ReduceSender(
+ private ReduceSender(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<PipelineMessage<T>> topology,
ICommunicationGroupNetworkObserver networkHandler,
IReduceFunction<T> reduceFunction,
@@ -68,12 +71,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
_pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction);
_topology = topology;
- _topology.Initialize();
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
networkHandler.Register(operatorName, msgHandler);
PipelineDataConverter = dataConverter;
+
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index b40ff68..13635bb 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -35,8 +35,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
public class ScatterReceiver<T> : IScatterReceiver<T>
{
private const int DefaultVersion = 1;
-
- private readonly ICommunicationGroupNetworkObserver _networkHandler;
private readonly OperatorTopology<T> _topology;
/// <summary>
@@ -44,25 +42,30 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// </summary>
/// <param name="operatorName">The name of the scatter operator</param>
/// <param name="groupName">The name of the operator's CommunicationGroup</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The task's operator topology graph</param>
/// <param name="networkHandler">Handles incoming messages from other tasks</param>
[Inject]
- public ScatterReceiver(
+ private ScatterReceiver(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<T> topology,
ICommunicationGroupNetworkObserver networkHandler)
{
OperatorName = operatorName;
GroupName = groupName;
Version = DefaultVersion;
-
- _networkHandler = networkHandler;
_topology = topology;
- _topology.Initialize();
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
- _networkHandler.Register(operatorName, msgHandler);
+ networkHandler.Register(operatorName, msgHandler);
+
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 2c664b8..47b6f6f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using System;
using System.Collections.Generic;
using System.Reactive;
using Org.Apache.REEF.Network.Group.Config;
@@ -35,8 +36,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
public class ScatterSender<T> : IScatterSender<T>
{
private const int DefaultVersion = 1;
-
- private readonly ICommunicationGroupNetworkObserver _networkHandler;
private readonly OperatorTopology<T> _topology;
/// <summary>
@@ -44,25 +43,30 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// </summary>
/// <param name="operatorName">The name of the scatter operator</param>
/// <param name="groupName">The name of the operator's Communication Group</param>
+ /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered.
+ /// Default is true. For unit testing, it can be set to false.</param>
/// <param name="topology">The operator topology</param>
/// <param name="networkHandler">The network handler</param>
[Inject]
- public ScatterSender(
+ private ScatterSender(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
+ [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize,
OperatorTopology<T> topology,
ICommunicationGroupNetworkObserver networkHandler)
{
OperatorName = operatorName;
GroupName = groupName;
Version = DefaultVersion;
-
- _networkHandler = networkHandler;
_topology = topology;
- _topology.Initialize();
var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message));
- _networkHandler.Register(operatorName, msgHandler);
+ networkHandler.Register(operatorName, msgHandler);
+
+ if (initialize)
+ {
+ topology.Initialize();
+ }
}
public string OperatorName { get; private set; }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 3fcdc2f..1155048 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -19,18 +19,14 @@
using System;
using System.Collections.Generic;
-using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Network.Group.Task.Impl
@@ -41,61 +37,46 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
public class CommunicationGroupClient : ICommunicationGroupClient
{
private readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupClient));
-
- private readonly string _taskId;
- private string _driverId;
-
- private readonly Dictionary<string, IInjector> _operatorInjectors;
private readonly Dictionary<string, object> _operators;
- private readonly NetworkService<GroupCommunicationMessage> _networkService;
- private readonly IGroupCommNetworkObserver _groupCommNetworkHandler;
- private readonly ICommunicationGroupNetworkObserver _commGroupNetworkHandler;
/// <summary>
/// Creates a new CommunicationGroupClient.
/// </summary>
- /// <param name="taskId">The identifier for this Task.</param>
/// <param name="groupName">The name of the CommunicationGroup</param>
- /// <param name="driverId">The identifier for the driver</param>
/// <param name="operatorConfigs">The serialized operator configurations</param>
/// <param name="groupCommNetworkObserver">The handler for all incoming messages
/// across all Communication Groups</param>
- /// <param name="networkService">The network service used to send messages.</param>
/// <param name="configSerializer">Used to deserialize operator configuration.</param>
+ /// <param name="commGroupNetworkHandler">Communication group network observer that holds all the handlers for each operator.</param>
+ /// <param name="injector">injector forked from the injector that creates this instance</param>
[Inject]
- public CommunicationGroupClient(
- [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
+ private CommunicationGroupClient(
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
- [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
[Parameter(typeof(GroupCommConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs,
IGroupCommNetworkObserver groupCommNetworkObserver,
- NetworkService<GroupCommunicationMessage> networkService,
AvroConfigurationSerializer configSerializer,
- CommunicationGroupNetworkObserver commGroupNetworkHandler)
+ ICommunicationGroupNetworkObserver commGroupNetworkHandler,
+ IInjector injector)
{
- _taskId = taskId;
- _driverId = driverId;
- GroupName = groupName;
-
_operators = new Dictionary<string, object>();
- _operatorInjectors = new Dictionary<string, IInjector>();
- _networkService = networkService;
- _groupCommNetworkHandler = groupCommNetworkObserver;
- _commGroupNetworkHandler = commGroupNetworkHandler;
- _groupCommNetworkHandler.Register(groupName, _commGroupNetworkHandler);
+ GroupName = groupName;
+ groupCommNetworkObserver.Register(groupName, commGroupNetworkHandler);
- // Deserialize operator configuration and store each injector.
- // When user requests the Group Communication Operator, use type information to
- // create the instance.
foreach (string operatorConfigStr in operatorConfigs)
- {
+ {
IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr);
- IInjector injector = TangFactory.GetTang().NewInjector(operatorConfig);
- string operatorName = injector.GetNamedInstance<GroupCommConfigurationOptions.OperatorName, string>(
+ IInjector operatorInjector = injector.ForkInjector(operatorConfig);
+ string operatorName = operatorInjector.GetNamedInstance<GroupCommConfigurationOptions.OperatorName, string>(
GenericType<GroupCommConfigurationOptions.OperatorName>.Class);
- _operatorInjectors[operatorName] = injector;
+ string msgType = operatorInjector.GetNamedInstance<GroupCommConfigurationOptions.MessageType, string>(
+ GenericType<GroupCommConfigurationOptions.MessageType>.Class);
+
+ Type groupCommOperatorGenericInterface = typeof(IGroupCommOperator<>);
+ Type groupCommOperatorInterface = groupCommOperatorGenericInterface.MakeGenericType(Type.GetType(msgType));
+ var operatorObj = operatorInjector.GetInstance(groupCommOperatorInterface);
+ _operators.Add(operatorName, operatorObj);
}
}
@@ -185,32 +166,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
{
throw new ArgumentNullException("operatorName");
}
- if (!_operatorInjectors.ContainsKey(operatorName))
- {
- throw new ArgumentException("Invalid operator name, cannot create CommunicationGroupClient");
- }
object op;
if (!_operators.TryGetValue(operatorName, out op))
{
- IInjector injector = _operatorInjectors[operatorName];
-
- injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, _taskId);
- injector.BindVolatileParameter(GenericType<GroupCommConfigurationOptions.CommunicationGroupName>.Class, GroupName);
- injector.BindVolatileInstance(GenericType<ICommunicationGroupNetworkObserver>.Class, _commGroupNetworkHandler);
- injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, _networkService);
- injector.BindVolatileInstance(GenericType<ICommunicationGroupClient>.Class, this);
-
- try
- {
- op = injector.GetInstance<T>();
- _operators[operatorName] = op;
- }
- catch (InjectionException)
- {
- LOGGER.Log(Level.Error, "Cannot inject Group Communication operator: No known operator of type: {0}", typeof(T));
- throw;
- }
+ Exceptions.Throw(new ArgumentException("Operator is not added at Driver side:" + operatorName), LOGGER);
}
return (T) op;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index 444c4a1..30dddcf 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -35,20 +35,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
private readonly Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers;
- private readonly int _retryCount;
- private readonly int _sleepTime;
/// <summary>
/// Creates a new CommunicationGroupNetworkObserver.
/// </summary>
[Inject]
- public CommunicationGroupNetworkObserver(
- [Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForHanler))] int retryCount,
- [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForHandler))] int sleepTime)
+ public CommunicationGroupNetworkObserver()
{
_handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>();
- _retryCount = retryCount;
- _sleepTime = sleepTime;
}
/// <summary>
@@ -83,7 +77,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
{
string operatorName = message.OperatorName;
- IObserver<GroupCommunicationMessage> handler = GetOperatorHandler(operatorName, _retryCount, _sleepTime);
+ IObserver<GroupCommunicationMessage> handler = GetOperatorHandler(operatorName);
if (handler == null)
{
@@ -99,30 +93,15 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// GetOperatorHandler for operatorName
/// </summary>
/// <param name="operatorName"></param>
- /// <param name="retry"></param>
- /// <param name="sleepTime"></param>
/// <returns></returns>
- private IObserver<GroupCommunicationMessage> GetOperatorHandler(string operatorName, int retry, int sleepTime)
+ private IObserver<GroupCommunicationMessage> GetOperatorHandler(string operatorName)
{
- //registration of handler might be delayed while the Network Service has received message from other servers
- for (int i = 0; i < retry; i++)
+ IObserver<GroupCommunicationMessage> handler;
+ if (!_handlers.TryGetValue(operatorName, out handler))
{
- if (!_handlers.ContainsKey(operatorName))
- {
- LOGGER.Log(Level.Info, "handler for operator {0} has not been registered." + operatorName);
- Thread.Sleep(sleepTime);
- }
- else
- {
- IObserver<GroupCommunicationMessage> handler;
- if (!_handlers.TryGetValue(operatorName, out handler))
- {
- Exceptions.Throw(new ArgumentException("No handler registered yet with the operator name: " + operatorName), LOGGER);
- }
- return handler;
- }
+ Exceptions.Throw(new ApplicationException("No handler registered yet with the operator name: " + operatorName), LOGGER);
}
- return null;
+ return handler;
}
public void OnError(Exception error)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index 4f0f283..4cf0e06 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -25,15 +25,13 @@ using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote.Impl;
namespace Org.Apache.REEF.Network.Group.Task.Impl
{
/// <summary>
- /// Used by Tasks to fetch CommunicationGroupClients.
+ /// Container of ommunicationGroupClients
/// </summary>
public class GroupCommClient : IGroupCommClient
{
@@ -43,20 +41,20 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <summary>
/// Creates a new GroupCommClient and registers the task ID with the Name Server.
+ /// Currently the GroupCommClient is injected in task constructor. When work with REEF-289, we should put the injection at a proepr palce.
/// </summary>
/// <param name="groupConfigs">The set of serialized Group Communication configurations</param>
- /// <param name="taskId">The identifier for this task</param>
- /// <param name="groupCommNetworkObserver">The network handler to receive incoming messages
- /// for this task</param>
+ /// <param name="taskId">The identifier for this taskfor this task</param>
/// <param name="networkService">The network service used to send messages</param>
/// <param name="configSerializer">Used to deserialize Group Communication configuration</param>
+ /// <param name="injector">injector forked from the injector that creates this instance</param>
[Inject]
- public GroupCommClient(
+ private GroupCommClient(
[Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
- IGroupCommNetworkObserver groupCommNetworkObserver,
NetworkService<GroupCommunicationMessage> networkService,
- AvroConfigurationSerializer configSerializer)
+ AvroConfigurationSerializer configSerializer,
+ IInjector injector)
{
_commGroups = new Dictionary<string, ICommunicationGroupClient>();
_networkService = networkService;
@@ -65,14 +63,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
foreach (string serializedGroupConfig in groupConfigs)
{
IConfiguration groupConfig = configSerializer.FromString(serializedGroupConfig);
-
- IInjector injector = TangFactory.GetTang().NewInjector(groupConfig);
- injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, taskId);
- injector.BindVolatileInstance(GenericType<IGroupCommNetworkObserver>.Class, groupCommNetworkObserver);
- injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, networkService);
-
- ICommunicationGroupClient commGroup = injector.GetInstance<ICommunicationGroupClient>();
- _commGroups[commGroup.GroupName] = commGroup;
+ IInjector groupInjector = injector.ForkInjector(groupConfig);
+ ICommunicationGroupClient commGroupClient = groupInjector.GetInstance<ICommunicationGroupClient>();
+ _commGroups[commGroupClient.GroupName] = commGroupClient;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index e80dea6..c36f1ca 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -110,10 +110,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(broadcastSpec.SenderId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+ SetMessageType(typeof(BroadcastSender<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+ SetMessageType(typeof(BroadcastReceiver<T>), confBuilder);
}
}
else if (OperatorSpec is ReduceOperatorSpec)
@@ -122,10 +124,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(reduceSpec.ReceiverId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+ SetMessageType(typeof(ReduceReceiver<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+ SetMessageType(typeof(ReduceSender<T>), confBuilder);
}
}
else if (OperatorSpec is ScatterOperatorSpec)
@@ -134,10 +138,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(scatterSpec.SenderId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+ SetMessageType(typeof(ScatterSender<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+ SetMessageType(typeof(ScatterReceiver<T>), confBuilder);
}
}
else
@@ -148,6 +154,17 @@ namespace Org.Apache.REEF.Network.Group.Topology
return Configurations.Merge(confBuilder.Build(), OperatorSpec.Configiration);
}
+ private static void SetMessageType(Type operatorType, ICsConfigurationBuilder confBuilder)
+ {
+ if (operatorType.IsGenericType)
+ {
+ var genericTypes = operatorType.GenericTypeArguments;
+ var msgType = genericTypes[0];
+ confBuilder.BindNamedParameter<GroupCommConfigurationOptions.MessageType, string>(
+ GenericType<GroupCommConfigurationOptions.MessageType>.Class, msgType.AssemblyQualifiedName);
+ }
+ }
+
/// <summary>
/// Adds a task to the topology graph.
/// </summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index e8ba6c1..d6c6bc6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -111,7 +111,6 @@ namespace Org.Apache.REEF.Network.Group.Topology
//add parentid, if no parent, add itself
ICsConfigurationBuilder confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
- //.BindImplementation(typeof(ICodec<T1>), OperatorSpec.Codec)
.BindNamedParameter<GroupCommConfigurationOptions.TopologyRootTaskId, string>(
GenericType<GroupCommConfigurationOptions.TopologyRootTaskId>.Class,
parentId);
@@ -130,10 +129,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(broadcastSpec.SenderId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class);
+ SetMessageType(typeof(BroadcastSender<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class);
+ SetMessageType(typeof(BroadcastReceiver<T>), confBuilder);
}
}
else if (OperatorSpec is ReduceOperatorSpec)
@@ -142,10 +143,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(reduceSpec.ReceiverId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class);
+ SetMessageType(typeof(ReduceReceiver<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ReduceSender<T>>.Class);
+ SetMessageType(typeof(ReduceSender<T>), confBuilder);
}
}
else if (OperatorSpec is ScatterOperatorSpec)
@@ -154,10 +157,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
if (taskId.Equals(scatterSpec.SenderId))
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ScatterSender<T>>.Class);
+ SetMessageType(typeof(ScatterSender<T>), confBuilder);
}
else
{
confBuilder.BindImplementation(GenericType<IGroupCommOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class);
+ SetMessageType(typeof(ScatterReceiver<T>), confBuilder);
}
}
else
@@ -233,5 +238,16 @@ namespace Org.Apache.REEF.Network.Group.Topology
_prev.Successor = node;
_prev = node;
}
+
+ private static void SetMessageType(Type operatorType, ICsConfigurationBuilder confBuilder)
+ {
+ if (operatorType.IsGenericType)
+ {
+ var genericTypes = operatorType.GenericTypeArguments;
+ var msgType = genericTypes[0];
+ confBuilder.BindNamedParameter<GroupCommConfigurationOptions.MessageType, string>(
+ GenericType<GroupCommConfigurationOptions.MessageType>.Class, msgType.AssemblyQualifiedName);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/75f25a26/lang/cs/Org.Apache.REEF.Tang.Tests/Injection/TestInjection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang.Tests/Injection/TestInjection.cs b/lang/cs/Org.Apache.REEF.Tang.Tests/Injection/TestInjection.cs
index 3fbb173..99bb297 100644
--- a/lang/cs/Org.Apache.REEF.Tang.Tests/Injection/TestInjection.cs
+++ b/lang/cs/Org.Apache.REEF.Tang.Tests/Injection/TestInjection.cs
@@ -304,6 +304,57 @@ namespace Org.Apache.REEF.Tang.Tests.Injection
Assert.IsNotNull(o.ExternalObject is ExternalClass);
}
+
+ /// <summary>
+ /// In this test, interface is a generic of T. Implementations have different generic arguments such as int and string.
+ /// When doing injection, we must specify the interface with a specified argument type
+ /// </summary>
+ [TestMethod]
+ public void TestInjectionWithGenericArguments()
+ {
+ var c = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation(GenericType<IMyOperator<int>>.Class, GenericType<MyOperatorImpl<int>>.Class)
+ .BindImplementation(GenericType<IMyOperator<string>>.Class, GenericType<MyOperatorImpl<string>>.Class)
+ .Build();
+
+ var injector = TangFactory.GetTang().NewInjector(c);
+
+ //argument type must be specified in injection
+ var o1 = injector.GetInstance(typeof(IMyOperator<int>));
+ var o2 = injector.GetInstance(typeof(IMyOperator<string>));
+ var o3 = injector.GetInstance(typeof(MyOperatorTopology<int>));
+
+ Assert.IsTrue(o1 is MyOperatorImpl<int>);
+ Assert.IsTrue(o2 is MyOperatorImpl<string>);
+ Assert.IsTrue(o3 is MyOperatorTopology<int>);
+ }
+
+ /// <summary>
+ /// In this test, interface argument type is set through Configuration. We can get the argument type and then
+ /// make the interface with the argument type on the fly so that to do the injection
+ /// </summary>
+ [TestMethod]
+ public void TestInjectionWithGenericArgumentType()
+ {
+ var c = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation(GenericType<IMyOperator<int[]>>.Class, GenericType<MyOperatorImpl<int[]>>.Class)
+ .BindNamedParameter(typeof(MessageType), typeof(int[]).AssemblyQualifiedName)
+ .Build();
+
+ var injector = TangFactory.GetTang().NewInjector(c);
+
+ //get argument type from configuration
+ var messageTypeAsString = injector.GetNamedInstance<MessageType, string>(GenericType<MessageType>.Class);
+ Type messageType = Type.GetType(messageTypeAsString);
+
+ //creat interface with generic type on the fly
+ Type genericInterfaceType = typeof(IMyOperator<>);
+ Type interfaceOfMessageType = genericInterfaceType.MakeGenericType(messageType);
+
+ var o = injector.GetInstance(interfaceOfMessageType);
+
+ Assert.IsTrue(o is MyOperatorImpl<int[]>);
+ }
}
class AReferenceClass : IAInterface
@@ -386,4 +437,35 @@ namespace Org.Apache.REEF.Tang.Tests.Injection
{
}
}
+
+ interface IMyOperator<T>
+ {
+ string OperatorName { get; }
+ }
+
+ class MyOperatorImpl<T> : IMyOperator<T>
+ {
+ [Inject]
+ public MyOperatorImpl()
+ {
+ }
+
+ string IMyOperator<T>.OperatorName
+ {
+ get { throw new NotImplementedException(); }
+ }
+ }
+
+ [NamedParameter]
+ class MessageType : Name<string>
+ {
+ }
+
+ class MyOperatorTopology<T>
+ {
+ [Inject]
+ public MyOperatorTopology(IMyOperator<T> op)
+ {
+ }
+ }
}
\ No newline at end of file