You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/13 18:24:36 UTC
[1/3] incubator-reef git commit: [REEF-245] Move Network test from
REEF.Tests to REEF.Network.Tests project
Repository: incubator-reef
Updated Branches:
refs/heads/master f44a6599b -> 987d4f37d
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/NetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/NetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/NetworkServiceTests.cs
deleted file mode 100644
index 75f07c3..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/NetworkServiceTests.cs
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class NetworkServiceTests
- {
- [TestMethod]
- public void TestNetworkServiceOneWayCommunication()
- {
- int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
- int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
- BlockingCollection<string> queue = new BlockingCollection<string>();
-
- using (INameServer nameServer = new NameServer(0))
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- int nameServerPort = endpoint.Port;
- string nameServerAddr = endpoint.Address.ToString();
- using (INetworkService<string> networkService1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, null))
- using (INetworkService<string> networkService2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, new MessageHandler(queue)))
- {
- IIdentifier id1 = new StringIdentifier("service1");
- IIdentifier id2 = new StringIdentifier("service2");
- networkService1.Register(id1);
- networkService2.Register(id2);
-
- using (IConnection<string> connection = networkService1.NewConnection(id2))
- {
- connection.Open();
- connection.Write("abc");
- connection.Write("def");
- connection.Write("ghi");
-
- Assert.AreEqual("abc", queue.Take());
- Assert.AreEqual("def", queue.Take());
- Assert.AreEqual("ghi", queue.Take());
- }
- }
- }
- }
-
- [TestMethod]
- public void TestNetworkServiceTwoWayCommunication()
- {
- int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
- int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
- BlockingCollection<string> queue1 = new BlockingCollection<string>();
- BlockingCollection<string> queue2 = new BlockingCollection<string>();
-
- using (INameServer nameServer = new NameServer(0))
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- int nameServerPort = endpoint.Port;
- string nameServerAddr = endpoint.Address.ToString();
- using (INetworkService<string> networkService1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, new MessageHandler(queue1)))
- using (INetworkService<string> networkService2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, new MessageHandler(queue2)))
- {
- IIdentifier id1 = new StringIdentifier("service1");
- IIdentifier id2 = new StringIdentifier("service2");
- networkService1.Register(id1);
- networkService2.Register(id2);
-
- using (IConnection<string> connection1 = networkService1.NewConnection(id2))
- using (IConnection<string> connection2 = networkService2.NewConnection(id1))
- {
- connection1.Open();
- connection1.Write("abc");
- connection1.Write("def");
- connection1.Write("ghi");
-
- connection2.Open();
- connection2.Write("jkl");
- connection2.Write("mno");
-
- Assert.AreEqual("abc", queue2.Take());
- Assert.AreEqual("def", queue2.Take());
- Assert.AreEqual("ghi", queue2.Take());
-
- Assert.AreEqual("jkl", queue1.Take());
- Assert.AreEqual("mno", queue1.Take());
- }
- }
- }
- }
-
- private INetworkService<string> BuildNetworkService(
- int networkServicePort,
- int nameServicePort,
- string nameServiceAddr,
- IObserver<NsMessage<string>> handler)
- {
- // Test injection
- if (handler == null)
- {
- var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder()
- .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
- GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
- networkServicePort.ToString(CultureInfo.CurrentCulture))
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class,
- nameServicePort.ToString(CultureInfo.CurrentCulture))
- .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
- GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
- nameServiceAddr)
- .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
- .BindImplementation(GenericType<ICodec<string>>.Class, GenericType<StringCodec>.Class)
- .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class, GenericType<NetworkMessageHandler>.Class)
- .Build();
-
- return TangFactory.GetTang().NewInjector(networkServiceConf).GetInstance<NetworkService<string>>();
- }
-
- var nameserverConf = TangFactory.GetTang().NewConfigurationBuilder()
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class,
- nameServicePort.ToString(CultureInfo.CurrentCulture))
- .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
- GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
- nameServiceAddr)
- .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
- .Build();
-
- var nameClient = TangFactory.GetTang().NewInjector(nameserverConf).GetInstance<NameClient>();
- return new NetworkService<string>(networkServicePort,
- handler, new StringIdentifierFactory(), new StringCodec(), nameClient);
- }
-
- private class MessageHandler : IObserver<NsMessage<string>>
- {
- private readonly BlockingCollection<string> _queue;
-
- public MessageHandler(BlockingCollection<string> queue)
- {
- _queue = queue;
- }
-
- public void OnNext(NsMessage<string> value)
- {
- _queue.Add(value.Data.First());
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
- }
-
- private class NetworkMessageHandler : IObserver<NsMessage<string>>
- {
- [Inject]
- public NetworkMessageHandler()
- {
- }
-
- public void OnNext(NsMessage<string> value)
- {
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index a915f59..8cbcf61 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -73,11 +73,6 @@ under the License.
<Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceTest.cs" />
<Compile Include="Functional\MPI\ScatterReduceTest\SlaveTask.cs" />
<Compile Include="Functional\ReefFunctionalTest.cs" />
- <Compile Include="Network\BlockingCollectionExtensionTests.cs" />
- <Compile Include="Network\GroupCommunicationTests.cs" />
- <Compile Include="Network\GroupCommunicationTreeTopologyTests.cs" />
- <Compile Include="Network\NameServerTests.cs" />
- <Compile Include="Network\NetworkServiceTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utility\TestDriverConfigGenerator.cs" />
<Compile Include="Utility\TestExceptions.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.sln
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.sln b/lang/cs/Org.Apache.REEF.sln
index 8616032..69fc800 100644
Binary files a/lang/cs/Org.Apache.REEF.sln and b/lang/cs/Org.Apache.REEF.sln differ
[3/3] incubator-reef git commit: [REEF-245] Move Network test from
REEF.Tests to REEF.Network.Tests project
Posted by we...@apache.org.
[REEF-245] Move Network test from REEF.Tests to REEF.Network.Tests project
This PR is add a new test project for Network unit tests and move
Network unit test cases to the new project
JIRA:
[REEF-245](https://issues.apache.org/jira/browse/REEF-245)
This closes #142
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/987d4f37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/987d4f37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/987d4f37
Branch: refs/heads/master
Commit: 987d4f37d6b7be9ef367655bd27a2c67097d2aec
Parents: f44a659
Author: Julia Wang <jw...@yahoo.com>
Authored: Thu Apr 9 18:23:20 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Apr 13 09:20:09 2015 -0700
----------------------------------------------------------------------
.../BlockingCollectionExtensionTests.cs | 74 ++
.../GroupCommunicationTests.cs | 742 +++++++++++++++++++
.../GroupCommunicationTreeTopologyTests.cs | 634 ++++++++++++++++
.../NamingService/NameServerTests.cs | 269 +++++++
.../NetworkService/NetworkServiceTests.cs | 210 ++++++
.../Org.Apache.REEF.Network.Tests.csproj | 90 +++
.../Properties/AssemblyInfo.cs | 55 ++
.../packages.config | 23 +
.../Network/BlockingCollectionExtensionTests.cs | 74 --
.../Network/GroupCommunicationTests.cs | 742 -------------------
.../GroupCommunicationTreeTopologyTests.cs | 634 ----------------
.../Network/NameServerTests.cs | 269 -------
.../Network/NetworkServiceTests.cs | 210 ------
.../Org.Apache.REEF.Tests.csproj | 5 -
lang/cs/Org.Apache.REEF.sln | Bin 20908 -> 42590 bytes
15 files changed, 2097 insertions(+), 1934 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/BlockingCollectionExtensionTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/BlockingCollectionExtensionTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/BlockingCollectionExtensionTests.cs
new file mode 100644
index 0000000..aa4ebc6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/BlockingCollectionExtensionTests.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Network.Utilities;
+
+namespace Org.Apache.REEF.Network.Tests
+{
+ [TestClass]
+ public class BlockingCollectionExtensionTests
+ {
+ [TestMethod]
+ public void TestCollectionContainsElement()
+ {
+ string item = "abc";
+ BlockingCollection<string> collection = new BlockingCollection<string>();
+ collection.Add(item);
+
+ Assert.AreEqual(item, collection.Take(item));
+
+ // Check that item is no longer in collection
+ Assert.AreEqual(0, collection.Count);
+ }
+
+ [TestMethod]
+ public void TestCollectionContainsElement2()
+ {
+ string item = "abc";
+ BlockingCollection<string> collection = new BlockingCollection<string>();
+ collection.Add("cat");
+ collection.Add(item);
+ collection.Add("dog");
+
+ Assert.AreEqual(item, collection.Take(item));
+
+ // Remove remaining items, check that item is not there
+ Assert.AreNotEqual(item, collection.Take());
+ Assert.AreNotEqual(item, collection.Take());
+ Assert.AreEqual(0, collection.Count);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void TestCollectionDoesNotContainsElement()
+ {
+ string item1 = "abc";
+ string item2 = "def";
+
+ BlockingCollection<string> collection = new BlockingCollection<string>();
+ collection.Add(item2);
+
+ // Should throw InvalidOperationException since item1 is not in collection
+ collection.Take(item1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
new file mode 100644
index 0000000..c0808d8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reactive;
+using System.Text;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Codec;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Tests.GroupCommunication
+{
+ [TestClass]
+ public class GroupCommunicationTests
+ {
+ [TestMethod]
+ public void TestSender()
+ {
+ using (NameServer nameServer = new NameServer(0))
+ {
+ IPEndPoint endpoint = nameServer.LocalEndpoint;
+ BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>();
+ BlockingCollection<GroupCommunicationMessage> messages2 = new BlockingCollection<GroupCommunicationMessage>();
+
+ var handler1 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
+ msg => messages1.Add(msg.Data.First()));
+ var handler2 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
+ msg => messages2.Add(msg.Data.First()));
+
+ var networkService1 = BuildNetworkService(endpoint, handler1);
+ var networkService2 = BuildNetworkService(endpoint, handler2);
+
+ networkService1.Register(new StringIdentifier("id1"));
+ networkService2.Register(new StringIdentifier("id2"));
+
+ Sender sender1 = new Sender(networkService1, new StringIdentifierFactory());
+ Sender sender2 = new Sender(networkService2, new StringIdentifierFactory());
+
+ sender1.Send(CreateGcm("abc", "id1", "id2"));
+ sender1.Send(CreateGcm("def", "id1", "id2"));
+
+ sender2.Send(CreateGcm("ghi", "id2", "id1"));
+
+ string msg1 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
+ string msg2 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
+ Assert.AreEqual("abc", msg1);
+ Assert.AreEqual("def", msg2);
+
+ string msg3 = Encoding.UTF8.GetString(messages1.Take().Data[0]);
+ Assert.AreEqual("ghi", msg3);
+ }
+ }
+
+ [TestMethod]
+ public void TestBroadcastReduceOperators()
+ {
+ string groupName = "group1";
+ string broadcastOperatorName = "broadcast";
+ string reduceOperatorName = "reduce";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 3;
+ int fanOut = 2;
+
+ var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast<int, IntCodec>(
+ broadcastOperatorName,
+ masterTaskId)
+ .AddReduce<int, IntCodec>(
+ reduceOperatorName,
+ masterTaskId,
+ new SumFunction())
+ .Build();
+
+ var commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ //for master task
+ IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
+ IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+ for (int j = 1; j <= 10; j++)
+ {
+ broadcastSender.Send(j);
+
+ int n1 = broadcastReceiver1.Receive();
+ int n2 = broadcastReceiver2.Receive();
+ Assert.AreEqual(j, n1);
+ Assert.AreEqual(j, n2);
+
+ int triangleNum1 = TriangleNumber(n1);
+ triangleNumberSender1.Send(triangleNum1);
+ int triangleNum2 = TriangleNumber(n2);
+ triangleNumberSender2.Send(triangleNum2);
+
+ int sum = sumReducer.Reduce();
+ int expected = TriangleNumber(j) * (numTasks - 1);
+ Assert.AreEqual(sum, expected);
+ }
+ }
+
+ [TestMethod]
+ public void TestScatterReduceOperators()
+ {
+ string groupName = "group1";
+ string scatterOperatorName = "scatter";
+ string reduceOperatorName = "reduce";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(
+ scatterOperatorName,
+ masterTaskId)
+ .AddReduce<int, IntCodec>(
+ reduceOperatorName,
+ masterTaskId,
+ new SumFunction())
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
+ IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = Enumerable.Range(1, 100).ToList();
+ List<string> order = new List<string> {"task4", "task3", "task2", "task1"};
+
+ sender.Send(data, order);
+
+ ScatterReceiveReduce(receiver4, sumSender4);
+ ScatterReceiveReduce(receiver3, sumSender3);
+ ScatterReceiveReduce(receiver2, sumSender2);
+ ScatterReceiveReduce(receiver1, sumSender1);
+
+ int sum = sumReducer.Reduce();
+
+ Assert.AreEqual(sum, data.Sum());
+ }
+
+ [TestMethod]
+ public void TestBroadcastOperator()
+ {
+ NameServer nameServer = new NameServer(0);
+
+ string groupName = "group1";
+ string operatorName = "broadcast";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 10;
+ int value = 1337;
+ int fanOut = 3;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
+ IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+
+ sender.Send(value);
+ Assert.AreEqual(value, receiver1.Receive());
+ Assert.AreEqual(value, receiver2.Receive());
+ }
+
+ [TestMethod]
+ public void TestBroadcastOperatorWithDefaultCodec()
+ {
+ NameServer nameServer = new NameServer(0);
+
+ string groupName = "group1";
+ string operatorName = "broadcast";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 10;
+ int value = 1337;
+ int fanOut = 3;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
+ IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+
+ sender.Send(value);
+ Assert.AreEqual(value, receiver1.Receive());
+ Assert.AreEqual(value, receiver2.Receive());
+ }
+
+ [TestMethod]
+ public void TestBroadcastOperator2()
+ {
+ string groupName = "group1";
+ string operatorName = "broadcast";
+ string driverId = "driverId";
+ string masterTaskId = "task0";
+ int numTasks = 3;
+ int value1 = 1337;
+ int value2 = 42;
+ int value3 = 99;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
+ IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+
+ sender.Send(value1);
+ Assert.AreEqual(value1, receiver1.Receive());
+ Assert.AreEqual(value1, receiver2.Receive());
+
+ sender.Send(value2);
+ Assert.AreEqual(value2, receiver1.Receive());
+ Assert.AreEqual(value2, receiver2.Receive());
+
+ sender.Send(value3);
+ Assert.AreEqual(value3, receiver1.Receive());
+ Assert.AreEqual(value3, receiver2.Receive());
+ }
+
+ [TestMethod]
+ public void TestReduceOperator()
+ {
+ string groupName = "group1";
+ string operatorName = "reduce";
+ int numTasks = 4;
+ string driverId = "driverid";
+ string masterTaskId = "task0";
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
+ IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
+
+ Assert.IsNotNull(receiver);
+ Assert.IsNotNull(sender1);
+ Assert.IsNotNull(sender2);
+ Assert.IsNotNull(sender3);
+
+ sender3.Send(5);
+ sender1.Send(1);
+ sender2.Send(3);
+
+ Assert.AreEqual(9, receiver.Reduce());
+ }
+
+ [TestMethod]
+ public void TestReduceOperator2()
+ {
+ string groupName = "group1";
+ string operatorName = "reduce";
+ int numTasks = 4;
+ string driverId = "driverid";
+ string masterTaskId = "task0";
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
+ IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
+
+ Assert.IsNotNull(receiver);
+ Assert.IsNotNull(sender1);
+ Assert.IsNotNull(sender2);
+ Assert.IsNotNull(sender3);
+
+ sender3.Send(5);
+ sender1.Send(1);
+ sender2.Send(3);
+ Assert.AreEqual(9, receiver.Reduce());
+
+ sender3.Send(6);
+ sender1.Send(2);
+ sender2.Send(4);
+ Assert.AreEqual(12, receiver.Reduce());
+
+ sender3.Send(9);
+ sender1.Send(3);
+ sender2.Send(6);
+ Assert.AreEqual(18, receiver.Reduce());
+ }
+
+ [TestMethod]
+ public void TestScatterOperator()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddScatter(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = new List<int> { 1, 2, 3, 4 };
+
+ sender.Send(data);
+ Assert.AreEqual(1, receiver1.Receive().Single());
+ Assert.AreEqual(2, receiver2.Receive().Single());
+ Assert.AreEqual(3, receiver3.Receive().Single());
+ Assert.AreEqual(4, receiver4.Receive().Single());
+ }
+
+ [TestMethod]
+ public void TestScatterOperatorWithDefaultCodec()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddScatter(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = new List<int> { 1, 2, 3, 4 };
+
+ sender.Send(data);
+ Assert.AreEqual(1, receiver1.Receive().Single());
+ Assert.AreEqual(2, receiver2.Receive().Single());
+ Assert.AreEqual(3, receiver3.Receive().Single());
+ Assert.AreEqual(4, receiver4.Receive().Single());
+ }
+
+ [TestMethod]
+ public void TestScatterOperator2()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+
+ sender.Send(data);
+ var data1 = receiver1.Receive();
+ Assert.AreEqual(1, data1.First());
+ Assert.AreEqual(2, data1.Last());
+
+ var data2 = receiver2.Receive();
+ Assert.AreEqual(3, data2.First());
+ Assert.AreEqual(4, data2.Last());
+
+ var data3 = receiver3.Receive();
+ Assert.AreEqual(5, data3.First());
+ Assert.AreEqual(6, data3.Last());
+
+ var data4 = receiver4.Receive();
+ Assert.AreEqual(7, data4.First());
+ Assert.AreEqual(8, data4.Last());
+ }
+
+ [TestMethod]
+ public void TestScatterOperator3()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 4;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+
+ sender.Send(data);
+
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(1, data1[0]);
+ Assert.AreEqual(2, data1[1]);
+ Assert.AreEqual(3, data1[2]);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(4, data2[0]);
+ Assert.AreEqual(5, data2[1]);
+ Assert.AreEqual(6, data2[2]);
+
+ var data3 = receiver3.Receive().ToArray();
+ Assert.AreEqual(7, data3[0]);
+ Assert.AreEqual(8, data3[1]);
+ }
+
+ [TestMethod]
+ public void TestScatterOperator4()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 4;
+ int fanOut = 2;
+
+ IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ var commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId)
+ .Build();
+
+ List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+ List<string> order = new List<string> { "task3", "task2", "task1" };
+
+ sender.Send(data, order);
+
+ var data3 = receiver3.Receive().ToArray();
+ Assert.AreEqual(1, data3[0]);
+ Assert.AreEqual(2, data3[1]);
+ Assert.AreEqual(3, data3[2]);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(4, data2[0]);
+ Assert.AreEqual(5, data2[1]);
+ Assert.AreEqual(6, data2[2]);
+
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(7, data1[0]);
+ Assert.AreEqual(8, data1[1]);
+ }
+
+ [TestMethod]
+ public void TestConfigurationBroadcastSpec()
+ {
+ FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
+ new BroadcastOperatorSpec<int, IntCodec>("Sender"));
+
+ topology.AddTask("task1");
+ var conf = topology.GetTaskConfiguration("task1");
+
+ ICodec<int> codec = TangFactory.GetTang().NewInjector(conf).GetInstance<ICodec<int>>();
+ Assert.AreEqual(3, codec.Decode(codec.Encode(3)));
+ }
+
+ [TestMethod]
+ public void TestConfigurationReduceSpec()
+ {
+ FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Group", "task1", "driverid",
+ new ReduceOperatorSpec<int, IntCodec>("task1", new SumFunction()));
+
+ topology.AddTask("task1");
+ var conf2 = topology.GetTaskConfiguration("task1");
+
+ IReduceFunction<int> reduceFunction = TangFactory.GetTang().NewInjector(conf2).GetInstance<IReduceFunction<int>>();
+ Assert.AreEqual(10, reduceFunction.Reduce(new int[] { 1, 2, 3, 4 }));
+ }
+
+ public static IMpiDriver GetInstanceOfMpiDriver(string driverId, string masterTaskId, string groupName, int fanOut, int numTasks)
+ {
+ var c = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId)
+ .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId)
+ .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName)
+ .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString())
+ .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString())
+ .BindImplementation(GenericType<IConfigurationSerializer>.Class, GenericType<AvroConfigurationSerializer>.Class)
+ .Build();
+
+ IMpiDriver mpiDriver = TangFactory.GetTang().NewInjector(c).GetInstance<MpiDriver>();
+ return mpiDriver;
+ }
+
+ public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IMpiDriver mpiDriver, ICommunicationGroupDriver commGroup)
+ {
+ List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>();
+ IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
+
+ List<IConfiguration> partialConfigs = new List<IConfiguration>();
+ for (int i = 0; i < numTasks; i++)
+ {
+ string taskId = "task" + i;
+ IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, taskId)
+ .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
+ .Build())
+ .Build();
+ commGroup.AddTask(taskId);
+ partialConfigs.Add(partialTaskConfig);
+ }
+
+ for (int i = 0; i < numTasks; i++)
+ {
+ string taskId = "task" + i;
+ IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId);
+ IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
+ IInjector injector = TangFactory.GetTang().NewInjector(mergedConf);
+
+ IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
+ commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
+ }
+ return commGroups;
+ }
+
+ public static NetworkService<GroupCommunicationMessage> BuildNetworkService(
+ IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler)
+ {
+ return new NetworkService<GroupCommunicationMessage>(
+ 0, handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec(), new NameClient(nameServerEndpoint.Address.ToString(), nameServerEndpoint.Port));
+ }
+
+ private GroupCommunicationMessage CreateGcm(string message, string from, string to)
+ {
+ byte[] data = Encoding.UTF8.GetBytes(message);
+ return new GroupCommunicationMessage("g1", "op1", from, to, data, MessageType.Data);
+ }
+
+ private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender)
+ {
+ List<int> data1 = receiver.Receive();
+ int sum1 = data1.Sum();
+ sumSender.Send(sum1);
+ }
+
+ public static int TriangleNumber(int n)
+ {
+ return Enumerable.Range(1, n).Sum();
+ }
+ }
+
+ public class SumFunction : IReduceFunction<int>
+ {
+ [Inject]
+ public SumFunction()
+ {
+ }
+
+ public int Reduce(IEnumerable<int> elements)
+ {
+ return elements.Sum();
+ }
+ }
+
+ public class MyTask : ITask
+ {
+ public void Dispose()
+ {
+ throw new NotImplementedException();
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
new file mode 100644
index 0000000..b244f33
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Tests.GroupCommunication
+{
+ [TestClass]
+ public class GroupCommunicationTreeTopologyTests
+ {
+ [TestMethod]
+ public void TestTreeTopology()
+ {
+ TreeTopology<int, IntCodec> topology = new TreeTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
+ new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
+ for (int i = 1; i < 8; i++)
+ {
+ string taskid = "task" + i;
+ topology.AddTask(taskid);
+ }
+
+ for (int i = 1; i < 8; i++)
+ {
+ var conf = topology.GetTaskConfiguration("task" + i);
+ }
+ }
+
+ [TestMethod]
+ public void TestReduceOperator()
+ {
+ string groupName = "group1";
+ string operatorName = "reduce";
+ int numTasks = 10;
+ string driverId = "driverId";
+ string masterTaskId = "task0";
+ int fanOut = 3;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
+ IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender4 = commGroups[4].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender5 = commGroups[5].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender6 = commGroups[6].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender7 = commGroups[7].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender8 = commGroups[8].GetReduceSender<int>(operatorName);
+ IReduceSender<int> sender9 = commGroups[9].GetReduceSender<int>(operatorName);
+
+ Assert.IsNotNull(receiver);
+ Assert.IsNotNull(sender1);
+ Assert.IsNotNull(sender2);
+ Assert.IsNotNull(sender3);
+ Assert.IsNotNull(sender4);
+ Assert.IsNotNull(sender5);
+ Assert.IsNotNull(sender6);
+ Assert.IsNotNull(sender7);
+ Assert.IsNotNull(sender8);
+ Assert.IsNotNull(sender9);
+
+ sender9.Send(9);
+ sender8.Send(8);
+ sender7.Send(7);
+ sender6.Send(6);
+ sender5.Send(5);
+ sender4.Send(4);
+ sender3.Send(3);
+ sender2.Send(2);
+ sender1.Send(1);
+
+ Assert.AreEqual(45, receiver.Reduce());
+ }
+
+ [TestMethod]
+ public void TestBroadcastOperator()
+ {
+ string groupName = "group1";
+ string operatorName = "broadcast";
+ string driverId = "driverId";
+ string masterTaskId = "task0";
+ int numTasks = 10;
+ int value1 = 1337;
+ int value2 = 42;
+ int value3 = 99;
+ int fanOut = 3;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
+ IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver3 = commGroups[3].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver4 = commGroups[4].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver5 = commGroups[5].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver6 = commGroups[6].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver7 = commGroups[7].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver8 = commGroups[8].GetBroadcastReceiver<int>(operatorName);
+ IBroadcastReceiver<int> receiver9 = commGroups[9].GetBroadcastReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+ Assert.IsNotNull(receiver5);
+ Assert.IsNotNull(receiver6);
+ Assert.IsNotNull(receiver7);
+ Assert.IsNotNull(receiver8);
+ Assert.IsNotNull(receiver9);
+
+ sender.Send(value1);
+ Assert.AreEqual(value1, receiver1.Receive());
+ Assert.AreEqual(value1, receiver2.Receive());
+ Assert.AreEqual(value1, receiver3.Receive());
+ Assert.AreEqual(value1, receiver4.Receive());
+ Assert.AreEqual(value1, receiver5.Receive());
+ Assert.AreEqual(value1, receiver6.Receive());
+ Assert.AreEqual(value1, receiver7.Receive());
+ Assert.AreEqual(value1, receiver8.Receive());
+ Assert.AreEqual(value1, receiver9.Receive());
+
+ sender.Send(value2);
+ Assert.AreEqual(value2, receiver1.Receive());
+ Assert.AreEqual(value2, receiver2.Receive());
+ Assert.AreEqual(value2, receiver3.Receive());
+ Assert.AreEqual(value2, receiver4.Receive());
+ Assert.AreEqual(value2, receiver5.Receive());
+ Assert.AreEqual(value2, receiver6.Receive());
+ Assert.AreEqual(value2, receiver7.Receive());
+ Assert.AreEqual(value2, receiver8.Receive());
+ Assert.AreEqual(value2, receiver9.Receive());
+
+ sender.Send(value3);
+ Assert.AreEqual(value3, receiver1.Receive());
+ Assert.AreEqual(value3, receiver2.Receive());
+ Assert.AreEqual(value3, receiver3.Receive());
+ Assert.AreEqual(value3, receiver4.Receive());
+ Assert.AreEqual(value3, receiver5.Receive());
+ Assert.AreEqual(value3, receiver6.Receive());
+ Assert.AreEqual(value3, receiver7.Receive());
+ Assert.AreEqual(value3, receiver8.Receive());
+ Assert.AreEqual(value3, receiver9.Receive());
+ }
+
+
+ [TestMethod]
+ public void TestBroadcastReduceOperators()
+ {
+ string groupName = "group1";
+ string broadcastOperatorName = "broadcast";
+ string reduceOperatorName = "reduce";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 10;
+ int fanOut = 3;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddBroadcast<int, IntCodec>(
+ broadcastOperatorName,
+ masterTaskId,
+ TopologyTypes.Tree)
+ .AddReduce<int, IntCodec>(
+ reduceOperatorName,
+ masterTaskId,
+ new SumFunction(),
+ TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ //for master task
+ IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
+ IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver3 = commGroups[3].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver4 = commGroups[4].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver5 = commGroups[5].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender5 = commGroups[5].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver6 = commGroups[6].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender6 = commGroups[6].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver7 = commGroups[7].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender7 = commGroups[7].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver8 = commGroups[8].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender8 = commGroups[8].GetReduceSender<int>(reduceOperatorName);
+
+ IBroadcastReceiver<int> broadcastReceiver9 = commGroups[9].GetBroadcastReceiver<int>(broadcastOperatorName);
+ IReduceSender<int> triangleNumberSender9 = commGroups[9].GetReduceSender<int>(reduceOperatorName);
+
+ for (int i = 1; i <= 10; i++)
+ {
+ broadcastSender.Send(i);
+
+ int n1 = broadcastReceiver1.Receive();
+ int n2 = broadcastReceiver2.Receive();
+ int n3 = broadcastReceiver3.Receive();
+ int n4 = broadcastReceiver4.Receive();
+ int n5 = broadcastReceiver5.Receive();
+ int n6 = broadcastReceiver6.Receive();
+ int n7 = broadcastReceiver7.Receive();
+ int n8 = broadcastReceiver8.Receive();
+ int n9 = broadcastReceiver9.Receive();
+ Assert.AreEqual(i, n1);
+ Assert.AreEqual(i, n2);
+ Assert.AreEqual(i, n3);
+ Assert.AreEqual(i, n4);
+ Assert.AreEqual(i, n5);
+ Assert.AreEqual(i, n6);
+ Assert.AreEqual(i, n7);
+ Assert.AreEqual(i, n8);
+ Assert.AreEqual(i, n9);
+
+ int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9);
+ triangleNumberSender9.Send(triangleNum9);
+
+ int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8);
+ triangleNumberSender8.Send(triangleNum8);
+
+ int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7);
+ triangleNumberSender7.Send(triangleNum7);
+
+ int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6);
+ triangleNumberSender6.Send(triangleNum6);
+
+ int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5);
+ triangleNumberSender5.Send(triangleNum5);
+
+ int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4);
+ triangleNumberSender4.Send(triangleNum4);
+
+ int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3);
+ triangleNumberSender3.Send(triangleNum3);
+
+ int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2);
+ triangleNumberSender2.Send(triangleNum2);
+
+ int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1);
+ triangleNumberSender1.Send(triangleNum1);
+
+ int sum = sumReducer.Reduce();
+ int expected = GroupCommunicationTests.TriangleNumber(i) * (numTasks - 1);
+ Assert.AreEqual(sum, expected);
+ }
+ }
+
+ [TestMethod]
+ public void TestScatterOperator()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = new List<int> { 1, 2, 3, 4 };
+
+ sender.Send(data);
+ var receved1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(1, receved1[0]);
+ Assert.AreEqual(2, receved1[1]);
+
+ var receved2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(3, receved2[0]);
+ Assert.AreEqual(4, receved2[1]);
+
+ Assert.AreEqual(1, receiver3.Receive().Single());
+ Assert.AreEqual(2, receiver4.Receive().Single());
+ }
+
+ [TestMethod]
+ public void TestScatterOperator2()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+
+ sender.Send(data);
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(1, data1[0]);
+ Assert.AreEqual(2, data1[1]);
+ Assert.AreEqual(3, data1[2]);
+ Assert.AreEqual(4, data1[3]);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(5, data2[0]);
+ Assert.AreEqual(6, data2[1]);
+ Assert.AreEqual(7, data2[2]);
+ Assert.AreEqual(8, data2[3]);
+
+ var data3 = receiver3.Receive();
+ Assert.AreEqual(1, data3.First());
+ Assert.AreEqual(2, data3.Last());
+
+ var data4 = receiver4.Receive();
+ Assert.AreEqual(3, data4.First());
+ Assert.AreEqual(4, data4.Last());
+ }
+
+ [TestMethod]
+ public void TestScatterOperator3()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 4;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+
+ sender.Send(data);
+
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(1, data1[0]);
+ Assert.AreEqual(2, data1[1]);
+ Assert.AreEqual(3, data1[2]);
+ Assert.AreEqual(4, data1[3]);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(5, data2[0]);
+ Assert.AreEqual(6, data2[1]);
+ Assert.AreEqual(7, data2[2]);
+ Assert.AreEqual(8, data2[3]);
+
+ var data3 = receiver3.Receive().ToArray();
+ Assert.AreEqual(1, data3[0]);
+ Assert.AreEqual(2, data3[1]);
+ Assert.AreEqual(3, data3[2]);
+ Assert.AreEqual(4, data3[3]);
+ }
+
+ [TestMethod]
+ public void TestScatterOperator4()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 4;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+ List<string> order = new List<string> { "task2", "task1" };
+
+ sender.Send(data, order);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(1, data2[0]);
+ Assert.AreEqual(2, data2[1]);
+ Assert.AreEqual(3, data2[2]);
+ Assert.AreEqual(4, data2[3]);
+
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(5, data1[0]);
+ Assert.AreEqual(6, data1[1]);
+ Assert.AreEqual(7, data1[2]);
+ Assert.AreEqual(8, data1[3]);
+
+ var data3 = receiver3.Receive().ToArray();
+ Assert.AreEqual(5, data3[0]);
+ Assert.AreEqual(6, data3[1]);
+ Assert.AreEqual(7, data3[2]);
+ Assert.AreEqual(8, data3[3]);
+ }
+
+ [TestMethod]
+ public void TestScatterOperator5()
+ {
+ string groupName = "group1";
+ string operatorName = "scatter";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 6;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
+ .Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
+ IScatterReceiver<int> receiver5 = commGroups[5].GetScatterReceiver<int>(operatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+ Assert.IsNotNull(receiver5);
+
+ List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
+
+ sender.Send(data);
+
+ var data1 = receiver1.Receive().ToArray();
+ Assert.AreEqual(1, data1[0]);
+ Assert.AreEqual(2, data1[1]);
+ Assert.AreEqual(3, data1[2]);
+ Assert.AreEqual(4, data1[3]);
+
+ var data2 = receiver2.Receive().ToArray();
+ Assert.AreEqual(5, data2[0]);
+ Assert.AreEqual(6, data2[1]);
+ Assert.AreEqual(7, data2[2]);
+ Assert.AreEqual(8, data2[3]);
+
+ var data3 = receiver3.Receive().ToArray();
+ Assert.AreEqual(1, data3[0]);
+ Assert.AreEqual(2, data3[1]);
+
+ var data4= receiver4.Receive().ToArray();
+ Assert.AreEqual(3, data4[0]);
+ Assert.AreEqual(4, data4[1]);
+
+ var data5 = receiver5.Receive().ToArray();
+ Assert.AreEqual(5, data5[0]);
+ Assert.AreEqual(6, data5[1]);
+ Assert.AreEqual(7, data5[2]);
+ Assert.AreEqual(8, data5[3]);
+ }
+
+ [TestMethod]
+ public void TestScatterReduceOperators()
+ {
+ string groupName = "group1";
+ string scatterOperatorName = "scatter";
+ string reduceOperatorName = "reduce";
+ string masterTaskId = "task0";
+ string driverId = "Driver Id";
+ int numTasks = 5;
+ int fanOut = 2;
+
+ var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
+
+ ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+ .AddScatter<int, IntCodec>(
+ scatterOperatorName,
+ masterTaskId,
+ TopologyTypes.Tree)
+ .AddReduce<int, IntCodec>(
+ reduceOperatorName,
+ masterTaskId,
+ new SumFunction(),
+ TopologyTypes.Tree).Build();
+
+ var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
+
+ IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
+ IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
+
+ IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
+ IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
+
+ Assert.IsNotNull(sender);
+ Assert.IsNotNull(receiver1);
+ Assert.IsNotNull(receiver2);
+ Assert.IsNotNull(receiver3);
+ Assert.IsNotNull(receiver4);
+
+ List<int> data = Enumerable.Range(1, 100).ToList();
+
+ sender.Send(data);
+
+ List<int> data1 = receiver1.Receive();
+ List<int> data2 = receiver2.Receive();
+
+ List<int> data3 = receiver3.Receive();
+ List<int> data4 = receiver4.Receive();
+
+ int sum3 = data3.Sum();
+ sumSender3.Send(sum3);
+
+ int sum4 = data4.Sum();
+ sumSender4.Send(sum4);
+
+ int sum2 = data2.Sum();
+ sumSender2.Send(sum2);
+
+ int sum1 = data1.Sum();
+ sumSender1.Send(sum1);
+
+ int sum = sumReducer.Reduce();
+ Assert.AreEqual(sum, 6325);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
new file mode 100644
index 0000000..fd3002c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NamingService
+{
+ [TestClass]
+ public class NameServerTests
+ {
+ [TestMethod]
+ public void TestNameServerNoRequests()
+ {
+ using (var server = new NameServer(0))
+ {
+ }
+ }
+
+ [TestMethod]
+ public void TestNameServerNoRequestsTwoClients()
+ {
+ using (var server = new NameServer(0))
+ {
+ var nameClient = new NameClient(server.LocalEndpoint);
+ var nameClient2 = new NameClient(server.LocalEndpoint);
+ nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
+ nameClient.Lookup("1");
+ }
+ }
+
+ [TestMethod]
+ public void TestNameServerNoRequestsTwoClients2()
+ {
+ using (var server = new NameServer(0))
+ {
+ var nameClient = new NameClient(server.LocalEndpoint);
+ var nameClient2 = new NameClient(server.LocalEndpoint);
+ nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
+ nameClient.Lookup("1");
+ }
+ }
+
+ [TestMethod]
+ public void TestNameServerMultipleRequestsTwoClients()
+ {
+ using (var server = new NameServer(0))
+ {
+ var nameClient = new NameClient(server.LocalEndpoint);
+ var nameClient2 = new NameClient(server.LocalEndpoint);
+ nameClient.Register("1", new IPEndPoint(IPAddress.Any, 8080));
+ nameClient2.Lookup("1");
+ }
+ }
+
+ [TestMethod]
+ public void TestRegister()
+ {
+ using (INameServer server = BuildNameServer())
+ {
+ using (INameClient client = BuildNameClient(server.LocalEndpoint))
+ {
+ IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
+ IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
+ IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
+
+ // Check that no endpoints have been registered
+ Assert.IsNull(client.Lookup("a"));
+ Assert.IsNull(client.Lookup("b"));
+ Assert.IsNull(client.Lookup("c"));
+
+ // Register endpoints
+ client.Register("a", endpoint1);
+ client.Register("b", endpoint2);
+ client.Register("c", endpoint3);
+
+ // Check that they can be looked up correctly
+ Assert.AreEqual(endpoint1, client.Lookup("a"));
+ Assert.AreEqual(endpoint2, client.Lookup("b"));
+ Assert.AreEqual(endpoint3, client.Lookup("c"));
+ }
+ }
+ }
+
+ [TestMethod]
+ public void TestUnregister()
+ {
+ using (INameServer server = BuildNameServer())
+ {
+ using (INameClient client = BuildNameClient(server.LocalEndpoint))
+ {
+ IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
+
+ // Register endpoint
+ client.Register("a", endpoint1);
+
+ // Check that it can be looked up correctly
+ Assert.AreEqual(endpoint1, client.Lookup("a"));
+
+ // Unregister endpoints
+ client.Unregister("a");
+ Thread.Sleep(1000);
+
+ // Make sure they were unregistered correctly
+ Assert.IsNull(client.Lookup("a"));
+ }
+ }
+ }
+
+ [TestMethod]
+ public void TestLookup()
+ {
+ using (INameServer server = BuildNameServer())
+ {
+ using (INameClient client = BuildNameClient(server.LocalEndpoint))
+ {
+ IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
+ IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
+
+ // Register endpoint1
+ client.Register("a", endpoint1);
+ Assert.AreEqual(endpoint1, client.Lookup("a"));
+
+ // Reregister identifer a
+ client.Register("a", endpoint2);
+ Assert.AreEqual(endpoint2, client.Lookup("a"));
+ }
+ }
+ }
+
+ [TestMethod]
+ public void TestLookupList()
+ {
+ using (INameServer server = BuildNameServer())
+ {
+ using (INameClient client = BuildNameClient(server.LocalEndpoint))
+ {
+ IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
+ IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
+ IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
+
+ // Register endpoints
+ client.Register("a", endpoint1);
+ client.Register("b", endpoint2);
+ client.Register("c", endpoint3);
+
+ // Look up both at the same time
+ List<string> ids = new List<string> { "a", "b", "c", "d" };
+ List<NameAssignment> assignments = client.Lookup(ids);
+
+ // Check that a, b, and c are registered
+ Assert.AreEqual("a", assignments[0].Identifier);
+ Assert.AreEqual(endpoint1, assignments[0].Endpoint);
+ Assert.AreEqual("b", assignments[1].Identifier);
+ Assert.AreEqual(endpoint2, assignments[1].Endpoint);
+ Assert.AreEqual("c", assignments[2].Identifier);
+ Assert.AreEqual(endpoint3, assignments[2].Endpoint);
+
+ // Check that d is not registered
+ Assert.AreEqual(3, assignments.Count);
+ }
+ }
+ }
+
+ [TestMethod]
+ public void TestNameClientRestart()
+ {
+ int oldPort = 6666;
+ int newPort = 6662;
+ INameServer server = new NameServer(oldPort);
+
+ using (INameClient client = BuildNameClient(server.LocalEndpoint))
+ {
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
+
+ client.Register("a", endpoint);
+ Assert.AreEqual(endpoint, client.Lookup("a"));
+
+ server.Dispose();
+
+ server = new NameServer(newPort);
+ client.Restart(server.LocalEndpoint);
+
+ client.Register("b", endpoint);
+ Assert.AreEqual(endpoint, client.Lookup("b"));
+
+ server.Dispose();
+ }
+ }
+
+ [TestMethod]
+ public void TestConstructorInjection()
+ {
+ int port = 6666;
+ using (INameServer server = new NameServer(port))
+ {
+ IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
+ .Set(NamingConfiguration.NameServerAddress, server.LocalEndpoint.Address.ToString())
+ .Set(NamingConfiguration.NameServerPort, port + string.Empty)
+ .Build();
+
+ ConstructorInjection c = TangFactory.GetTang()
+ .NewInjector(nameClientConfiguration)
+ .GetInstance<ConstructorInjection>();
+
+ Assert.IsNotNull(c);
+ }
+ }
+
+ private INameServer BuildNameServer()
+ {
+ var builder = TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+ GenericType<NamingConfigurationOptions.NameServerPort>.Class, "0");
+
+ return TangFactory.GetTang().NewInjector(builder.Build()).GetInstance<INameServer>();
+ }
+
+ private INameClient BuildNameClient(IPEndPoint remoteEndpoint)
+ {
+ string nameServerAddr = remoteEndpoint.Address.ToString();
+ int nameServerPort = remoteEndpoint.Port;
+ IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
+ .Set(NamingConfiguration.NameServerAddress, nameServerAddr)
+ .Set(NamingConfiguration.NameServerPort, nameServerPort + string.Empty)
+ .Build();
+
+ return TangFactory.GetTang().NewInjector(nameClientConfiguration).GetInstance<NameClient>();
+ }
+
+ private class ConstructorInjection
+ {
+ [Inject]
+ public ConstructorInjection(NameClient client)
+ {
+ if (client == null)
+ {
+ throw new ArgumentNullException("client");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
new file mode 100644
index 0000000..1489b3c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.Linq;
+using System.Net;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+ [TestClass]
+ public class NetworkServiceTests
+ {
+ [TestMethod]
+ public void TestNetworkServiceOneWayCommunication()
+ {
+ int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+ int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+
+ using (INameServer nameServer = new NameServer(0))
+ {
+ IPEndPoint endpoint = nameServer.LocalEndpoint;
+ int nameServerPort = endpoint.Port;
+ string nameServerAddr = endpoint.Address.ToString();
+ using (INetworkService<string> networkService1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, null))
+ using (INetworkService<string> networkService2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, new MessageHandler(queue)))
+ {
+ IIdentifier id1 = new StringIdentifier("service1");
+ IIdentifier id2 = new StringIdentifier("service2");
+ networkService1.Register(id1);
+ networkService2.Register(id2);
+
+ using (IConnection<string> connection = networkService1.NewConnection(id2))
+ {
+ connection.Open();
+ connection.Write("abc");
+ connection.Write("def");
+ connection.Write("ghi");
+
+ Assert.AreEqual("abc", queue.Take());
+ Assert.AreEqual("def", queue.Take());
+ Assert.AreEqual("ghi", queue.Take());
+ }
+ }
+ }
+ }
+
+ [TestMethod]
+ public void TestNetworkServiceTwoWayCommunication()
+ {
+ int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+ int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
+
+ using (INameServer nameServer = new NameServer(0))
+ {
+ IPEndPoint endpoint = nameServer.LocalEndpoint;
+ int nameServerPort = endpoint.Port;
+ string nameServerAddr = endpoint.Address.ToString();
+ using (INetworkService<string> networkService1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, new MessageHandler(queue1)))
+ using (INetworkService<string> networkService2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, new MessageHandler(queue2)))
+ {
+ IIdentifier id1 = new StringIdentifier("service1");
+ IIdentifier id2 = new StringIdentifier("service2");
+ networkService1.Register(id1);
+ networkService2.Register(id2);
+
+ using (IConnection<string> connection1 = networkService1.NewConnection(id2))
+ using (IConnection<string> connection2 = networkService2.NewConnection(id1))
+ {
+ connection1.Open();
+ connection1.Write("abc");
+ connection1.Write("def");
+ connection1.Write("ghi");
+
+ connection2.Open();
+ connection2.Write("jkl");
+ connection2.Write("mno");
+
+ Assert.AreEqual("abc", queue2.Take());
+ Assert.AreEqual("def", queue2.Take());
+ Assert.AreEqual("ghi", queue2.Take());
+
+ Assert.AreEqual("jkl", queue1.Take());
+ Assert.AreEqual("mno", queue1.Take());
+ }
+ }
+ }
+ }
+
+ private INetworkService<string> BuildNetworkService(
+ int networkServicePort,
+ int nameServicePort,
+ string nameServiceAddr,
+ IObserver<NsMessage<string>> handler)
+ {
+ // Test injection
+ if (handler == null)
+ {
+ var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
+ GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
+ networkServicePort.ToString(CultureInfo.CurrentCulture))
+ .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+ GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+ nameServicePort.ToString(CultureInfo.CurrentCulture))
+ .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+ GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+ nameServiceAddr)
+ .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
+ .BindImplementation(GenericType<ICodec<string>>.Class, GenericType<StringCodec>.Class)
+ .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class, GenericType<NetworkMessageHandler>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewInjector(networkServiceConf).GetInstance<NetworkService<string>>();
+ }
+
+ var nameserverConf = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+ GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+ nameServicePort.ToString(CultureInfo.CurrentCulture))
+ .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+ GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+ nameServiceAddr)
+ .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
+ .Build();
+
+ var nameClient = TangFactory.GetTang().NewInjector(nameserverConf).GetInstance<NameClient>();
+ return new NetworkService<string>(networkServicePort,
+ handler, new StringIdentifierFactory(), new StringCodec(), nameClient);
+ }
+
+ private class MessageHandler : IObserver<NsMessage<string>>
+ {
+ private readonly BlockingCollection<string> _queue;
+
+ public MessageHandler(BlockingCollection<string> queue)
+ {
+ _queue = queue;
+ }
+
+ public void OnNext(NsMessage<string> value)
+ {
+ _queue.Add(value.Data.First());
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private class NetworkMessageHandler : IObserver<NsMessage<string>>
+ {
+ [Inject]
+ public NetworkMessageHandler()
+ {
+ }
+
+ public void OnNext(NsMessage<string> value)
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+ }
+}
[2/3] incubator-reef git commit: [REEF-245] Move Network test from
REEF.Tests to REEF.Network.Tests project
Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
new file mode 100644
index 0000000..f67e0ea
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <ProjectGuid>{2D30B07C-4DDC-4932-A293-C5CAB6BE34D9}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Network.Tests</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Network.Tests</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ <ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
+ <VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">10.0</VisualStudioVersion>
+ <VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
+ <ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
+ <IsCodedUITest>False</IsCodedUITest>
+ <TestProjectType>UnitTest</TestProjectType>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\build.props" />
+ <PropertyGroup>
+ <BuildPackage>false</BuildPackage>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
+ <Reference Include="System" />
+ <Reference Include="System.Reactive.Core">
+ <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BlockingCollectionExtensionTests.cs" />
+ <Compile Include="GroupCommunication\GroupCommunicationTests.cs" />
+ <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
+ <Compile Include="NamingService\NameServerTests.cs" />
+ <Compile Include="NetworkService\NetworkServiceTests.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+ <Name>Org.Apache.REEF.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+ <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
+ <Name>Org.Apache.REEF.Network</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..7754581
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Network.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Network.Tests")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("146f73c7-6a44-4e98-9b4a-be2584930a66")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/packages.config b/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
new file mode 100644
index 0000000..75e5b34
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
deleted file mode 100644
index 1737888..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.Utilities;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class BlockingCollectionExtensionTests
- {
- [TestMethod]
- public void TestCollectionContainsElement()
- {
- string item = "abc";
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add(item);
-
- Assert.AreEqual(item, collection.Take(item));
-
- // Check that item is no longer in collection
- Assert.AreEqual(0, collection.Count);
- }
-
- [TestMethod]
- public void TestCollectionContainsElement2()
- {
- string item = "abc";
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add("cat");
- collection.Add(item);
- collection.Add("dog");
-
- Assert.AreEqual(item, collection.Take(item));
-
- // Remove remaining items, check that item is not there
- Assert.AreNotEqual(item, collection.Take());
- Assert.AreNotEqual(item, collection.Take());
- Assert.AreEqual(0, collection.Count);
- }
-
- [TestMethod]
- [ExpectedException(typeof(InvalidOperationException))]
- public void TestCollectionDoesNotContainsElement()
- {
- string item1 = "abc";
- string item2 = "def";
-
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add(item2);
-
- // Should throw InvalidOperationException since item1 is not in collection
- collection.Take(item1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
deleted file mode 100644
index 5f931f2..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
+++ /dev/null
@@ -1,742 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Reactive;
-using System.Text;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Codec;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class GroupCommunicationTests
- {
- [TestMethod]
- public void TestSender()
- {
- using (NameServer nameServer = new NameServer(0))
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>();
- BlockingCollection<GroupCommunicationMessage> messages2 = new BlockingCollection<GroupCommunicationMessage>();
-
- var handler1 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
- msg => messages1.Add(msg.Data.First()));
- var handler2 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
- msg => messages2.Add(msg.Data.First()));
-
- var networkService1 = BuildNetworkService(endpoint, handler1);
- var networkService2 = BuildNetworkService(endpoint, handler2);
-
- networkService1.Register(new StringIdentifier("id1"));
- networkService2.Register(new StringIdentifier("id2"));
-
- Sender sender1 = new Sender(networkService1, new StringIdentifierFactory());
- Sender sender2 = new Sender(networkService2, new StringIdentifierFactory());
-
- sender1.Send(CreateGcm("abc", "id1", "id2"));
- sender1.Send(CreateGcm("def", "id1", "id2"));
-
- sender2.Send(CreateGcm("ghi", "id2", "id1"));
-
- string msg1 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
- string msg2 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
- Assert.AreEqual("abc", msg1);
- Assert.AreEqual("def", msg2);
-
- string msg3 = Encoding.UTF8.GetString(messages1.Take().Data[0]);
- Assert.AreEqual("ghi", msg3);
- }
- }
-
- [TestMethod]
- public void TestBroadcastReduceOperators()
- {
- string groupName = "group1";
- string broadcastOperatorName = "broadcast";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 3;
- int fanOut = 2;
-
- var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(
- broadcastOperatorName,
- masterTaskId)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction())
- .Build();
-
- var commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- //for master task
- IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- for (int j = 1; j <= 10; j++)
- {
- broadcastSender.Send(j);
-
- int n1 = broadcastReceiver1.Receive();
- int n2 = broadcastReceiver2.Receive();
- Assert.AreEqual(j, n1);
- Assert.AreEqual(j, n2);
-
- int triangleNum1 = TriangleNumber(n1);
- triangleNumberSender1.Send(triangleNum1);
- int triangleNum2 = TriangleNumber(n2);
- triangleNumberSender2.Send(triangleNum2);
-
- int sum = sumReducer.Reduce();
- int expected = TriangleNumber(j) * (numTasks - 1);
- Assert.AreEqual(sum, expected);
- }
- }
-
- [TestMethod]
- public void TestScatterReduceOperators()
- {
- string groupName = "group1";
- string scatterOperatorName = "scatter";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(
- scatterOperatorName,
- masterTaskId)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = Enumerable.Range(1, 100).ToList();
- List<string> order = new List<string> {"task4", "task3", "task2", "task1"};
-
- sender.Send(data, order);
-
- ScatterReceiveReduce(receiver4, sumSender4);
- ScatterReceiveReduce(receiver3, sumSender3);
- ScatterReceiveReduce(receiver2, sumSender2);
- ScatterReceiveReduce(receiver1, sumSender1);
-
- int sum = sumReducer.Reduce();
-
- Assert.AreEqual(sum, data.Sum());
- }
-
- [TestMethod]
- public void TestBroadcastOperator()
- {
- NameServer nameServer = new NameServer(0);
-
- string groupName = "group1";
- string operatorName = "broadcast";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int value = 1337;
- int fanOut = 3;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value);
- Assert.AreEqual(value, receiver1.Receive());
- Assert.AreEqual(value, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestBroadcastOperatorWithDefaultCodec()
- {
- NameServer nameServer = new NameServer(0);
-
- string groupName = "group1";
- string operatorName = "broadcast";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int value = 1337;
- int fanOut = 3;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value);
- Assert.AreEqual(value, receiver1.Receive());
- Assert.AreEqual(value, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestBroadcastOperator2()
- {
- string groupName = "group1";
- string operatorName = "broadcast";
- string driverId = "driverId";
- string masterTaskId = "task0";
- int numTasks = 3;
- int value1 = 1337;
- int value2 = 42;
- int value3 = 99;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value1);
- Assert.AreEqual(value1, receiver1.Receive());
- Assert.AreEqual(value1, receiver2.Receive());
-
- sender.Send(value2);
- Assert.AreEqual(value2, receiver1.Receive());
- Assert.AreEqual(value2, receiver2.Receive());
-
- sender.Send(value3);
- Assert.AreEqual(value3, receiver1.Receive());
- Assert.AreEqual(value3, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestReduceOperator()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 4;
- string driverId = "driverid";
- string masterTaskId = "task0";
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
-
- sender3.Send(5);
- sender1.Send(1);
- sender2.Send(3);
-
- Assert.AreEqual(9, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestReduceOperator2()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 4;
- string driverId = "driverid";
- string masterTaskId = "task0";
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
-
- sender3.Send(5);
- sender1.Send(1);
- sender2.Send(3);
- Assert.AreEqual(9, receiver.Reduce());
-
- sender3.Send(6);
- sender1.Send(2);
- sender2.Send(4);
- Assert.AreEqual(12, receiver.Reduce());
-
- sender3.Send(9);
- sender1.Send(3);
- sender2.Send(6);
- Assert.AreEqual(18, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestScatterOperator()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- Assert.AreEqual(1, receiver1.Receive().Single());
- Assert.AreEqual(2, receiver2.Receive().Single());
- Assert.AreEqual(3, receiver3.Receive().Single());
- Assert.AreEqual(4, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperatorWithDefaultCodec()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- Assert.AreEqual(1, receiver1.Receive().Single());
- Assert.AreEqual(2, receiver2.Receive().Single());
- Assert.AreEqual(3, receiver3.Receive().Single());
- Assert.AreEqual(4, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperator2()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
- var data1 = receiver1.Receive();
- Assert.AreEqual(1, data1.First());
- Assert.AreEqual(2, data1.Last());
-
- var data2 = receiver2.Receive();
- Assert.AreEqual(3, data2.First());
- Assert.AreEqual(4, data2.Last());
-
- var data3 = receiver3.Receive();
- Assert.AreEqual(5, data3.First());
- Assert.AreEqual(6, data3.Last());
-
- var data4 = receiver4.Receive();
- Assert.AreEqual(7, data4.First());
- Assert.AreEqual(8, data4.Last());
- }
-
- [TestMethod]
- public void TestScatterOperator3()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(4, data2[0]);
- Assert.AreEqual(5, data2[1]);
- Assert.AreEqual(6, data2[2]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(7, data3[0]);
- Assert.AreEqual(8, data3[1]);
- }
-
- [TestMethod]
- public void TestScatterOperator4()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
- List<string> order = new List<string> { "task3", "task2", "task1" };
-
- sender.Send(data, order);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
- Assert.AreEqual(3, data3[2]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(4, data2[0]);
- Assert.AreEqual(5, data2[1]);
- Assert.AreEqual(6, data2[2]);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(7, data1[0]);
- Assert.AreEqual(8, data1[1]);
- }
-
- [TestMethod]
- public void TestConfigurationBroadcastSpec()
- {
- FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
- new BroadcastOperatorSpec<int, IntCodec>("Sender"));
-
- topology.AddTask("task1");
- var conf = topology.GetTaskConfiguration("task1");
-
- ICodec<int> codec = TangFactory.GetTang().NewInjector(conf).GetInstance<ICodec<int>>();
- Assert.AreEqual(3, codec.Decode(codec.Encode(3)));
- }
-
- [TestMethod]
- public void TestConfigurationReduceSpec()
- {
- FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Group", "task1", "driverid",
- new ReduceOperatorSpec<int, IntCodec>("task1", new SumFunction()));
-
- topology.AddTask("task1");
- var conf2 = topology.GetTaskConfiguration("task1");
-
- IReduceFunction<int> reduceFunction = TangFactory.GetTang().NewInjector(conf2).GetInstance<IReduceFunction<int>>();
- Assert.AreEqual(10, reduceFunction.Reduce(new int[] { 1, 2, 3, 4 }));
- }
-
- public static IMpiDriver GetInstanceOfMpiDriver(string driverId, string masterTaskId, string groupName, int fanOut, int numTasks)
- {
- var c = TangFactory.GetTang().NewConfigurationBuilder()
- .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId)
- .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId)
- .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName)
- .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString())
- .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString())
- .BindImplementation(GenericType<IConfigurationSerializer>.Class, GenericType<AvroConfigurationSerializer>.Class)
- .Build();
-
- IMpiDriver mpiDriver = TangFactory.GetTang().NewInjector(c).GetInstance<MpiDriver>();
- return mpiDriver;
- }
-
- public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IMpiDriver mpiDriver, ICommunicationGroupDriver commGroup)
- {
- List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>();
- IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
- List<IConfiguration> partialConfigs = new List<IConfiguration>();
- for (int i = 0; i < numTasks; i++)
- {
- string taskId = "task" + i;
- IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, taskId)
- .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
- .Build())
- .Build();
- commGroup.AddTask(taskId);
- partialConfigs.Add(partialTaskConfig);
- }
-
- for (int i = 0; i < numTasks; i++)
- {
- string taskId = "task" + i;
- IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId);
- IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
- IInjector injector = TangFactory.GetTang().NewInjector(mergedConf);
-
- IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
- commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
- }
- return commGroups;
- }
-
- public static NetworkService<GroupCommunicationMessage> BuildNetworkService(
- IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler)
- {
- return new NetworkService<GroupCommunicationMessage>(
- 0, handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec(), new NameClient(nameServerEndpoint.Address.ToString(), nameServerEndpoint.Port));
- }
-
- private GroupCommunicationMessage CreateGcm(string message, string from, string to)
- {
- byte[] data = Encoding.UTF8.GetBytes(message);
- return new GroupCommunicationMessage("g1", "op1", from, to, data, MessageType.Data);
- }
-
- private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender)
- {
- List<int> data1 = receiver.Receive();
- int sum1 = data1.Sum();
- sumSender.Send(sum1);
- }
-
- public static int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-
- public class SumFunction : IReduceFunction<int>
- {
- [Inject]
- public SumFunction()
- {
- }
-
- public int Reduce(IEnumerable<int> elements)
- {
- return elements.Sum();
- }
- }
-
- public class MyTask : ITask
- {
- public void Dispose()
- {
- throw new NotImplementedException();
- }
-
- public byte[] Call(byte[] memento)
- {
- throw new NotImplementedException();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
deleted file mode 100644
index 667e45f..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
+++ /dev/null
@@ -1,634 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Linq;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class GroupCommunicationTreeTopologyTests
- {
- [TestMethod]
- public void TestTreeTopology()
- {
- TreeTopology<int, IntCodec> topology = new TreeTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
- new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
- for (int i = 1; i < 8; i++)
- {
- string taskid = "task" + i;
- topology.AddTask(taskid);
- }
-
- for (int i = 1; i < 8; i++)
- {
- var conf = topology.GetTaskConfiguration("task" + i);
- }
- }
-
- [TestMethod]
- public void TestReduceOperator()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 10;
- string driverId = "driverId";
- string masterTaskId = "task0";
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender4 = commGroups[4].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender5 = commGroups[5].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender6 = commGroups[6].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender7 = commGroups[7].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender8 = commGroups[8].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender9 = commGroups[9].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
- Assert.IsNotNull(sender4);
- Assert.IsNotNull(sender5);
- Assert.IsNotNull(sender6);
- Assert.IsNotNull(sender7);
- Assert.IsNotNull(sender8);
- Assert.IsNotNull(sender9);
-
- sender9.Send(9);
- sender8.Send(8);
- sender7.Send(7);
- sender6.Send(6);
- sender5.Send(5);
- sender4.Send(4);
- sender3.Send(3);
- sender2.Send(2);
- sender1.Send(1);
-
- Assert.AreEqual(45, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestBroadcastOperator()
- {
- string groupName = "group1";
- string operatorName = "broadcast";
- string driverId = "driverId";
- string masterTaskId = "task0";
- int numTasks = 10;
- int value1 = 1337;
- int value2 = 42;
- int value3 = 99;
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver3 = commGroups[3].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver4 = commGroups[4].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver5 = commGroups[5].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver6 = commGroups[6].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver7 = commGroups[7].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver8 = commGroups[8].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver9 = commGroups[9].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
- Assert.IsNotNull(receiver5);
- Assert.IsNotNull(receiver6);
- Assert.IsNotNull(receiver7);
- Assert.IsNotNull(receiver8);
- Assert.IsNotNull(receiver9);
-
- sender.Send(value1);
- Assert.AreEqual(value1, receiver1.Receive());
- Assert.AreEqual(value1, receiver2.Receive());
- Assert.AreEqual(value1, receiver3.Receive());
- Assert.AreEqual(value1, receiver4.Receive());
- Assert.AreEqual(value1, receiver5.Receive());
- Assert.AreEqual(value1, receiver6.Receive());
- Assert.AreEqual(value1, receiver7.Receive());
- Assert.AreEqual(value1, receiver8.Receive());
- Assert.AreEqual(value1, receiver9.Receive());
-
- sender.Send(value2);
- Assert.AreEqual(value2, receiver1.Receive());
- Assert.AreEqual(value2, receiver2.Receive());
- Assert.AreEqual(value2, receiver3.Receive());
- Assert.AreEqual(value2, receiver4.Receive());
- Assert.AreEqual(value2, receiver5.Receive());
- Assert.AreEqual(value2, receiver6.Receive());
- Assert.AreEqual(value2, receiver7.Receive());
- Assert.AreEqual(value2, receiver8.Receive());
- Assert.AreEqual(value2, receiver9.Receive());
-
- sender.Send(value3);
- Assert.AreEqual(value3, receiver1.Receive());
- Assert.AreEqual(value3, receiver2.Receive());
- Assert.AreEqual(value3, receiver3.Receive());
- Assert.AreEqual(value3, receiver4.Receive());
- Assert.AreEqual(value3, receiver5.Receive());
- Assert.AreEqual(value3, receiver6.Receive());
- Assert.AreEqual(value3, receiver7.Receive());
- Assert.AreEqual(value3, receiver8.Receive());
- Assert.AreEqual(value3, receiver9.Receive());
- }
-
-
- [TestMethod]
- public void TestBroadcastReduceOperators()
- {
- string groupName = "group1";
- string broadcastOperatorName = "broadcast";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(
- broadcastOperatorName,
- masterTaskId,
- TopologyTypes.Tree)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction(),
- TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- //for master task
- IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver3 = commGroups[3].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver4 = commGroups[4].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver5 = commGroups[5].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender5 = commGroups[5].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver6 = commGroups[6].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender6 = commGroups[6].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver7 = commGroups[7].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender7 = commGroups[7].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver8 = commGroups[8].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender8 = commGroups[8].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver9 = commGroups[9].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender9 = commGroups[9].GetReduceSender<int>(reduceOperatorName);
-
- for (int i = 1; i <= 10; i++)
- {
- broadcastSender.Send(i);
-
- int n1 = broadcastReceiver1.Receive();
- int n2 = broadcastReceiver2.Receive();
- int n3 = broadcastReceiver3.Receive();
- int n4 = broadcastReceiver4.Receive();
- int n5 = broadcastReceiver5.Receive();
- int n6 = broadcastReceiver6.Receive();
- int n7 = broadcastReceiver7.Receive();
- int n8 = broadcastReceiver8.Receive();
- int n9 = broadcastReceiver9.Receive();
- Assert.AreEqual(i, n1);
- Assert.AreEqual(i, n2);
- Assert.AreEqual(i, n3);
- Assert.AreEqual(i, n4);
- Assert.AreEqual(i, n5);
- Assert.AreEqual(i, n6);
- Assert.AreEqual(i, n7);
- Assert.AreEqual(i, n8);
- Assert.AreEqual(i, n9);
-
- int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9);
- triangleNumberSender9.Send(triangleNum9);
-
- int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8);
- triangleNumberSender8.Send(triangleNum8);
-
- int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7);
- triangleNumberSender7.Send(triangleNum7);
-
- int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6);
- triangleNumberSender6.Send(triangleNum6);
-
- int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5);
- triangleNumberSender5.Send(triangleNum5);
-
- int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4);
- triangleNumberSender4.Send(triangleNum4);
-
- int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3);
- triangleNumberSender3.Send(triangleNum3);
-
- int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2);
- triangleNumberSender2.Send(triangleNum2);
-
- int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1);
- triangleNumberSender1.Send(triangleNum1);
-
- int sum = sumReducer.Reduce();
- int expected = GroupCommunicationTests.TriangleNumber(i) * (numTasks - 1);
- Assert.AreEqual(sum, expected);
- }
- }
-
- [TestMethod]
- public void TestScatterOperator()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- var receved1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, receved1[0]);
- Assert.AreEqual(2, receved1[1]);
-
- var receved2 = receiver2.Receive().ToArray();
- Assert.AreEqual(3, receved2[0]);
- Assert.AreEqual(4, receved2[1]);
-
- Assert.AreEqual(1, receiver3.Receive().Single());
- Assert.AreEqual(2, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperator2()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive();
- Assert.AreEqual(1, data3.First());
- Assert.AreEqual(2, data3.Last());
-
- var data4 = receiver4.Receive();
- Assert.AreEqual(3, data4.First());
- Assert.AreEqual(4, data4.Last());
- }
-
- [TestMethod]
- public void TestScatterOperator3()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
- Assert.AreEqual(3, data3[2]);
- Assert.AreEqual(4, data3[3]);
- }
-
- [TestMethod]
- public void TestScatterOperator4()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
- List<string> order = new List<string> { "task2", "task1" };
-
- sender.Send(data, order);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(1, data2[0]);
- Assert.AreEqual(2, data2[1]);
- Assert.AreEqual(3, data2[2]);
- Assert.AreEqual(4, data2[3]);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(5, data1[0]);
- Assert.AreEqual(6, data1[1]);
- Assert.AreEqual(7, data1[2]);
- Assert.AreEqual(8, data1[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(5, data3[0]);
- Assert.AreEqual(6, data3[1]);
- Assert.AreEqual(7, data3[2]);
- Assert.AreEqual(8, data3[3]);
- }
-
- [TestMethod]
- public void TestScatterOperator5()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 6;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver5 = commGroups[5].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
- Assert.IsNotNull(receiver5);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
-
- var data4= receiver4.Receive().ToArray();
- Assert.AreEqual(3, data4[0]);
- Assert.AreEqual(4, data4[1]);
-
- var data5 = receiver5.Receive().ToArray();
- Assert.AreEqual(5, data5[0]);
- Assert.AreEqual(6, data5[1]);
- Assert.AreEqual(7, data5[2]);
- Assert.AreEqual(8, data5[3]);
- }
-
- [TestMethod]
- public void TestScatterReduceOperators()
- {
- string groupName = "group1";
- string scatterOperatorName = "scatter";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(
- scatterOperatorName,
- masterTaskId,
- TopologyTypes.Tree)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction(),
- TopologyTypes.Tree).Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = Enumerable.Range(1, 100).ToList();
-
- sender.Send(data);
-
- List<int> data1 = receiver1.Receive();
- List<int> data2 = receiver2.Receive();
-
- List<int> data3 = receiver3.Receive();
- List<int> data4 = receiver4.Receive();
-
- int sum3 = data3.Sum();
- sumSender3.Send(sum3);
-
- int sum4 = data4.Sum();
- sumSender4.Send(sum4);
-
- int sum2 = data2.Sum();
- sumSender2.Send(sum2);
-
- int sum1 = data1.Sum();
- sumSender1.Send(sum1);
-
- int sum = sumReducer.Reduce();
- Assert.AreEqual(sum, 6325);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
deleted file mode 100644
index 927dfa7..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using System.Threading;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class NameServerTests
- {
- [TestMethod]
- public void TestNameServerNoRequests()
- {
- using (var server = new NameServer(0))
- {
- }
- }
-
- [TestMethod]
- public void TestNameServerNoRequestsTwoClients()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestNameServerNoRequestsTwoClients2()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestNameServerMultipleRequestsTwoClients()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient2.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestRegister()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
- IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
-
- // Check that no endpoints have been registered
- Assert.IsNull(client.Lookup("a"));
- Assert.IsNull(client.Lookup("b"));
- Assert.IsNull(client.Lookup("c"));
-
- // Register endpoints
- client.Register("a", endpoint1);
- client.Register("b", endpoint2);
- client.Register("c", endpoint3);
-
- // Check that they can be looked up correctly
- Assert.AreEqual(endpoint1, client.Lookup("a"));
- Assert.AreEqual(endpoint2, client.Lookup("b"));
- Assert.AreEqual(endpoint3, client.Lookup("c"));
- }
- }
- }
-
- [TestMethod]
- public void TestUnregister()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
-
- // Register endpoint
- client.Register("a", endpoint1);
-
- // Check that it can be looked up correctly
- Assert.AreEqual(endpoint1, client.Lookup("a"));
-
- // Unregister endpoints
- client.Unregister("a");
- Thread.Sleep(1000);
-
- // Make sure they were unregistered correctly
- Assert.IsNull(client.Lookup("a"));
- }
- }
- }
-
- [TestMethod]
- public void TestLookup()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
-
- // Register endpoint1
- client.Register("a", endpoint1);
- Assert.AreEqual(endpoint1, client.Lookup("a"));
-
- // Reregister identifer a
- client.Register("a", endpoint2);
- Assert.AreEqual(endpoint2, client.Lookup("a"));
- }
- }
- }
-
- [TestMethod]
- public void TestLookupList()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
- IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
-
- // Register endpoints
- client.Register("a", endpoint1);
- client.Register("b", endpoint2);
- client.Register("c", endpoint3);
-
- // Look up both at the same time
- List<string> ids = new List<string> { "a", "b", "c", "d" };
- List<NameAssignment> assignments = client.Lookup(ids);
-
- // Check that a, b, and c are registered
- Assert.AreEqual("a", assignments[0].Identifier);
- Assert.AreEqual(endpoint1, assignments[0].Endpoint);
- Assert.AreEqual("b", assignments[1].Identifier);
- Assert.AreEqual(endpoint2, assignments[1].Endpoint);
- Assert.AreEqual("c", assignments[2].Identifier);
- Assert.AreEqual(endpoint3, assignments[2].Endpoint);
-
- // Check that d is not registered
- Assert.AreEqual(3, assignments.Count);
- }
- }
- }
-
- [TestMethod]
- public void TestNameClientRestart()
- {
- int oldPort = 6666;
- int newPort = 6662;
- INameServer server = new NameServer(oldPort);
-
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
-
- client.Register("a", endpoint);
- Assert.AreEqual(endpoint, client.Lookup("a"));
-
- server.Dispose();
-
- server = new NameServer(newPort);
- client.Restart(server.LocalEndpoint);
-
- client.Register("b", endpoint);
- Assert.AreEqual(endpoint, client.Lookup("b"));
-
- server.Dispose();
- }
- }
-
- [TestMethod]
- public void TestConstructorInjection()
- {
- int port = 6666;
- using (INameServer server = new NameServer(port))
- {
- IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
- .Set(NamingConfiguration.NameServerAddress, server.LocalEndpoint.Address.ToString())
- .Set(NamingConfiguration.NameServerPort, port + string.Empty)
- .Build();
-
- ConstructorInjection c = TangFactory.GetTang()
- .NewInjector(nameClientConfiguration)
- .GetInstance<ConstructorInjection>();
-
- Assert.IsNotNull(c);
- }
- }
-
- private INameServer BuildNameServer()
- {
- var builder = TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class, "0");
-
- return TangFactory.GetTang().NewInjector(builder.Build()).GetInstance<INameServer>();
- }
-
- private INameClient BuildNameClient(IPEndPoint remoteEndpoint)
- {
- string nameServerAddr = remoteEndpoint.Address.ToString();
- int nameServerPort = remoteEndpoint.Port;
- IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
- .Set(NamingConfiguration.NameServerAddress, nameServerAddr)
- .Set(NamingConfiguration.NameServerPort, nameServerPort + string.Empty)
- .Build();
-
- return TangFactory.GetTang().NewInjector(nameClientConfiguration).GetInstance<NameClient>();
- }
-
- private class ConstructorInjection
- {
- [Inject]
- public ConstructorInjection(NameClient client)
- {
- if (client == null)
- {
- throw new ArgumentNullException("client");
- }
- }
- }
- }
-}