You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/13 18:24:37 UTC
[2/3] incubator-reef git commit: [REEF-245] Move Network test from
REEF.Tests to REEF.Network.Tests project
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
new file mode 100644
index 0000000..f67e0ea
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <ProjectGuid>{2D30B07C-4DDC-4932-A293-C5CAB6BE34D9}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Network.Tests</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Network.Tests</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ <ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
+ <VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">10.0</VisualStudioVersion>
+ <VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
+ <ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
+ <IsCodedUITest>False</IsCodedUITest>
+ <TestProjectType>UnitTest</TestProjectType>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\build.props" />
+ <PropertyGroup>
+ <BuildPackage>false</BuildPackage>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
+ <Reference Include="System" />
+ <Reference Include="System.Reactive.Core">
+ <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BlockingCollectionExtensionTests.cs" />
+ <Compile Include="GroupCommunication\GroupCommunicationTests.cs" />
+ <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
+ <Compile Include="NamingService\NameServerTests.cs" />
+ <Compile Include="NetworkService\NetworkServiceTests.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+ <Name>Org.Apache.REEF.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+ <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
+ <Name>Org.Apache.REEF.Network</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..7754581
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Network.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Network.Tests")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("146f73c7-6a44-4e98-9b4a-be2584930a66")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/packages.config b/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
new file mode 100644
index 0000000..75e5b34
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
deleted file mode 100644
index 1737888..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/BlockingCollectionExtensionTests.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.Utilities;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class BlockingCollectionExtensionTests
- {
- [TestMethod]
- public void TestCollectionContainsElement()
- {
- string item = "abc";
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add(item);
-
- Assert.AreEqual(item, collection.Take(item));
-
- // Check that item is no longer in collection
- Assert.AreEqual(0, collection.Count);
- }
-
- [TestMethod]
- public void TestCollectionContainsElement2()
- {
- string item = "abc";
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add("cat");
- collection.Add(item);
- collection.Add("dog");
-
- Assert.AreEqual(item, collection.Take(item));
-
- // Remove remaining items, check that item is not there
- Assert.AreNotEqual(item, collection.Take());
- Assert.AreNotEqual(item, collection.Take());
- Assert.AreEqual(0, collection.Count);
- }
-
- [TestMethod]
- [ExpectedException(typeof(InvalidOperationException))]
- public void TestCollectionDoesNotContainsElement()
- {
- string item1 = "abc";
- string item2 = "def";
-
- BlockingCollection<string> collection = new BlockingCollection<string>();
- collection.Add(item2);
-
- // Should throw InvalidOperationException since item1 is not in collection
- collection.Take(item1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
deleted file mode 100644
index 5f931f2..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
+++ /dev/null
@@ -1,742 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Reactive;
-using System.Text;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Codec;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class GroupCommunicationTests
- {
- [TestMethod]
- public void TestSender()
- {
- using (NameServer nameServer = new NameServer(0))
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>();
- BlockingCollection<GroupCommunicationMessage> messages2 = new BlockingCollection<GroupCommunicationMessage>();
-
- var handler1 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
- msg => messages1.Add(msg.Data.First()));
- var handler2 = Observer.Create<NsMessage<GroupCommunicationMessage>>(
- msg => messages2.Add(msg.Data.First()));
-
- var networkService1 = BuildNetworkService(endpoint, handler1);
- var networkService2 = BuildNetworkService(endpoint, handler2);
-
- networkService1.Register(new StringIdentifier("id1"));
- networkService2.Register(new StringIdentifier("id2"));
-
- Sender sender1 = new Sender(networkService1, new StringIdentifierFactory());
- Sender sender2 = new Sender(networkService2, new StringIdentifierFactory());
-
- sender1.Send(CreateGcm("abc", "id1", "id2"));
- sender1.Send(CreateGcm("def", "id1", "id2"));
-
- sender2.Send(CreateGcm("ghi", "id2", "id1"));
-
- string msg1 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
- string msg2 = Encoding.UTF8.GetString(messages2.Take().Data[0]);
- Assert.AreEqual("abc", msg1);
- Assert.AreEqual("def", msg2);
-
- string msg3 = Encoding.UTF8.GetString(messages1.Take().Data[0]);
- Assert.AreEqual("ghi", msg3);
- }
- }
-
- [TestMethod]
- public void TestBroadcastReduceOperators()
- {
- string groupName = "group1";
- string broadcastOperatorName = "broadcast";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 3;
- int fanOut = 2;
-
- var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(
- broadcastOperatorName,
- masterTaskId)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction())
- .Build();
-
- var commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- //for master task
- IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- for (int j = 1; j <= 10; j++)
- {
- broadcastSender.Send(j);
-
- int n1 = broadcastReceiver1.Receive();
- int n2 = broadcastReceiver2.Receive();
- Assert.AreEqual(j, n1);
- Assert.AreEqual(j, n2);
-
- int triangleNum1 = TriangleNumber(n1);
- triangleNumberSender1.Send(triangleNum1);
- int triangleNum2 = TriangleNumber(n2);
- triangleNumberSender2.Send(triangleNum2);
-
- int sum = sumReducer.Reduce();
- int expected = TriangleNumber(j) * (numTasks - 1);
- Assert.AreEqual(sum, expected);
- }
- }
-
- [TestMethod]
- public void TestScatterReduceOperators()
- {
- string groupName = "group1";
- string scatterOperatorName = "scatter";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(
- scatterOperatorName,
- masterTaskId)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = Enumerable.Range(1, 100).ToList();
- List<string> order = new List<string> {"task4", "task3", "task2", "task1"};
-
- sender.Send(data, order);
-
- ScatterReceiveReduce(receiver4, sumSender4);
- ScatterReceiveReduce(receiver3, sumSender3);
- ScatterReceiveReduce(receiver2, sumSender2);
- ScatterReceiveReduce(receiver1, sumSender1);
-
- int sum = sumReducer.Reduce();
-
- Assert.AreEqual(sum, data.Sum());
- }
-
- [TestMethod]
- public void TestBroadcastOperator()
- {
- NameServer nameServer = new NameServer(0);
-
- string groupName = "group1";
- string operatorName = "broadcast";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int value = 1337;
- int fanOut = 3;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value);
- Assert.AreEqual(value, receiver1.Receive());
- Assert.AreEqual(value, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestBroadcastOperatorWithDefaultCodec()
- {
- NameServer nameServer = new NameServer(0);
-
- string groupName = "group1";
- string operatorName = "broadcast";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int value = 1337;
- int fanOut = 3;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value);
- Assert.AreEqual(value, receiver1.Receive());
- Assert.AreEqual(value, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestBroadcastOperator2()
- {
- string groupName = "group1";
- string operatorName = "broadcast";
- string driverId = "driverId";
- string masterTaskId = "task0";
- int numTasks = 3;
- int value1 = 1337;
- int value2 = 42;
- int value3 = 99;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
-
- sender.Send(value1);
- Assert.AreEqual(value1, receiver1.Receive());
- Assert.AreEqual(value1, receiver2.Receive());
-
- sender.Send(value2);
- Assert.AreEqual(value2, receiver1.Receive());
- Assert.AreEqual(value2, receiver2.Receive());
-
- sender.Send(value3);
- Assert.AreEqual(value3, receiver1.Receive());
- Assert.AreEqual(value3, receiver2.Receive());
- }
-
- [TestMethod]
- public void TestReduceOperator()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 4;
- string driverId = "driverid";
- string masterTaskId = "task0";
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
-
- sender3.Send(5);
- sender1.Send(1);
- sender2.Send(3);
-
- Assert.AreEqual(9, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestReduceOperator2()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 4;
- string driverId = "driverid";
- string masterTaskId = "task0";
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction())
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
-
- sender3.Send(5);
- sender1.Send(1);
- sender2.Send(3);
- Assert.AreEqual(9, receiver.Reduce());
-
- sender3.Send(6);
- sender1.Send(2);
- sender2.Send(4);
- Assert.AreEqual(12, receiver.Reduce());
-
- sender3.Send(9);
- sender1.Send(3);
- sender2.Send(6);
- Assert.AreEqual(18, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestScatterOperator()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- Assert.AreEqual(1, receiver1.Receive().Single());
- Assert.AreEqual(2, receiver2.Receive().Single());
- Assert.AreEqual(3, receiver3.Receive().Single());
- Assert.AreEqual(4, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperatorWithDefaultCodec()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- Assert.AreEqual(1, receiver1.Receive().Single());
- Assert.AreEqual(2, receiver2.Receive().Single());
- Assert.AreEqual(3, receiver3.Receive().Single());
- Assert.AreEqual(4, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperator2()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
- var data1 = receiver1.Receive();
- Assert.AreEqual(1, data1.First());
- Assert.AreEqual(2, data1.Last());
-
- var data2 = receiver2.Receive();
- Assert.AreEqual(3, data2.First());
- Assert.AreEqual(4, data2.Last());
-
- var data3 = receiver3.Receive();
- Assert.AreEqual(5, data3.First());
- Assert.AreEqual(6, data3.Last());
-
- var data4 = receiver4.Receive();
- Assert.AreEqual(7, data4.First());
- Assert.AreEqual(8, data4.Last());
- }
-
- [TestMethod]
- public void TestScatterOperator3()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(4, data2[0]);
- Assert.AreEqual(5, data2[1]);
- Assert.AreEqual(6, data2[2]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(7, data3[0]);
- Assert.AreEqual(8, data3[1]);
- }
-
- [TestMethod]
- public void TestScatterOperator4()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- var commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId)
- .Build();
-
- List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
- List<string> order = new List<string> { "task3", "task2", "task1" };
-
- sender.Send(data, order);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
- Assert.AreEqual(3, data3[2]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(4, data2[0]);
- Assert.AreEqual(5, data2[1]);
- Assert.AreEqual(6, data2[2]);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(7, data1[0]);
- Assert.AreEqual(8, data1[1]);
- }
-
- [TestMethod]
- public void TestConfigurationBroadcastSpec()
- {
- FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
- new BroadcastOperatorSpec<int, IntCodec>("Sender"));
-
- topology.AddTask("task1");
- var conf = topology.GetTaskConfiguration("task1");
-
- ICodec<int> codec = TangFactory.GetTang().NewInjector(conf).GetInstance<ICodec<int>>();
- Assert.AreEqual(3, codec.Decode(codec.Encode(3)));
- }
-
- [TestMethod]
- public void TestConfigurationReduceSpec()
- {
- FlatTopology<int, IntCodec> topology = new FlatTopology<int, IntCodec>("Operator", "Group", "task1", "driverid",
- new ReduceOperatorSpec<int, IntCodec>("task1", new SumFunction()));
-
- topology.AddTask("task1");
- var conf2 = topology.GetTaskConfiguration("task1");
-
- IReduceFunction<int> reduceFunction = TangFactory.GetTang().NewInjector(conf2).GetInstance<IReduceFunction<int>>();
- Assert.AreEqual(10, reduceFunction.Reduce(new int[] { 1, 2, 3, 4 }));
- }
-
- public static IMpiDriver GetInstanceOfMpiDriver(string driverId, string masterTaskId, string groupName, int fanOut, int numTasks)
- {
- var c = TangFactory.GetTang().NewConfigurationBuilder()
- .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId)
- .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId)
- .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName)
- .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString())
- .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString())
- .BindImplementation(GenericType<IConfigurationSerializer>.Class, GenericType<AvroConfigurationSerializer>.Class)
- .Build();
-
- IMpiDriver mpiDriver = TangFactory.GetTang().NewInjector(c).GetInstance<MpiDriver>();
- return mpiDriver;
- }
-
- public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IMpiDriver mpiDriver, ICommunicationGroupDriver commGroup)
- {
- List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>();
- IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
- List<IConfiguration> partialConfigs = new List<IConfiguration>();
- for (int i = 0; i < numTasks; i++)
- {
- string taskId = "task" + i;
- IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, taskId)
- .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
- .Build())
- .Build();
- commGroup.AddTask(taskId);
- partialConfigs.Add(partialTaskConfig);
- }
-
- for (int i = 0; i < numTasks; i++)
- {
- string taskId = "task" + i;
- IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId);
- IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
- IInjector injector = TangFactory.GetTang().NewInjector(mergedConf);
-
- IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
- commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
- }
- return commGroups;
- }
-
- public static NetworkService<GroupCommunicationMessage> BuildNetworkService(
- IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler)
- {
- return new NetworkService<GroupCommunicationMessage>(
- 0, handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec(), new NameClient(nameServerEndpoint.Address.ToString(), nameServerEndpoint.Port));
- }
-
- private GroupCommunicationMessage CreateGcm(string message, string from, string to)
- {
- byte[] data = Encoding.UTF8.GetBytes(message);
- return new GroupCommunicationMessage("g1", "op1", from, to, data, MessageType.Data);
- }
-
- private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender)
- {
- List<int> data1 = receiver.Receive();
- int sum1 = data1.Sum();
- sumSender.Send(sum1);
- }
-
- public static int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-
- public class SumFunction : IReduceFunction<int>
- {
- [Inject]
- public SumFunction()
- {
- }
-
- public int Reduce(IEnumerable<int> elements)
- {
- return elements.Sum();
- }
- }
-
- public class MyTask : ITask
- {
- public void Dispose()
- {
- throw new NotImplementedException();
- }
-
- public byte[] Call(byte[] memento)
- {
- throw new NotImplementedException();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
deleted file mode 100644
index 667e45f..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
+++ /dev/null
@@ -1,634 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Linq;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class GroupCommunicationTreeTopologyTests
- {
- [TestMethod]
- public void TestTreeTopology()
- {
- TreeTopology<int, IntCodec> topology = new TreeTopology<int, IntCodec>("Operator", "Operator", "task1", "driverid",
- new BroadcastOperatorSpec<int, IntCodec>("task1"), 2);
- for (int i = 1; i < 8; i++)
- {
- string taskid = "task" + i;
- topology.AddTask(taskid);
- }
-
- for (int i = 1; i < 8; i++)
- {
- var conf = topology.GetTaskConfiguration("task" + i);
- }
- }
-
- [TestMethod]
- public void TestReduceOperator()
- {
- string groupName = "group1";
- string operatorName = "reduce";
- int numTasks = 10;
- string driverId = "driverId";
- string masterTaskId = "task0";
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
- IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender4 = commGroups[4].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender5 = commGroups[5].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender6 = commGroups[6].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender7 = commGroups[7].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender8 = commGroups[8].GetReduceSender<int>(operatorName);
- IReduceSender<int> sender9 = commGroups[9].GetReduceSender<int>(operatorName);
-
- Assert.IsNotNull(receiver);
- Assert.IsNotNull(sender1);
- Assert.IsNotNull(sender2);
- Assert.IsNotNull(sender3);
- Assert.IsNotNull(sender4);
- Assert.IsNotNull(sender5);
- Assert.IsNotNull(sender6);
- Assert.IsNotNull(sender7);
- Assert.IsNotNull(sender8);
- Assert.IsNotNull(sender9);
-
- sender9.Send(9);
- sender8.Send(8);
- sender7.Send(7);
- sender6.Send(6);
- sender5.Send(5);
- sender4.Send(4);
- sender3.Send(3);
- sender2.Send(2);
- sender1.Send(1);
-
- Assert.AreEqual(45, receiver.Reduce());
- }
-
- [TestMethod]
- public void TestBroadcastOperator()
- {
- string groupName = "group1";
- string operatorName = "broadcast";
- string driverId = "driverId";
- string masterTaskId = "task0";
- int numTasks = 10;
- int value1 = 1337;
- int value2 = 42;
- int value3 = 99;
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
- IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver3 = commGroups[3].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver4 = commGroups[4].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver5 = commGroups[5].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver6 = commGroups[6].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver7 = commGroups[7].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver8 = commGroups[8].GetBroadcastReceiver<int>(operatorName);
- IBroadcastReceiver<int> receiver9 = commGroups[9].GetBroadcastReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
- Assert.IsNotNull(receiver5);
- Assert.IsNotNull(receiver6);
- Assert.IsNotNull(receiver7);
- Assert.IsNotNull(receiver8);
- Assert.IsNotNull(receiver9);
-
- sender.Send(value1);
- Assert.AreEqual(value1, receiver1.Receive());
- Assert.AreEqual(value1, receiver2.Receive());
- Assert.AreEqual(value1, receiver3.Receive());
- Assert.AreEqual(value1, receiver4.Receive());
- Assert.AreEqual(value1, receiver5.Receive());
- Assert.AreEqual(value1, receiver6.Receive());
- Assert.AreEqual(value1, receiver7.Receive());
- Assert.AreEqual(value1, receiver8.Receive());
- Assert.AreEqual(value1, receiver9.Receive());
-
- sender.Send(value2);
- Assert.AreEqual(value2, receiver1.Receive());
- Assert.AreEqual(value2, receiver2.Receive());
- Assert.AreEqual(value2, receiver3.Receive());
- Assert.AreEqual(value2, receiver4.Receive());
- Assert.AreEqual(value2, receiver5.Receive());
- Assert.AreEqual(value2, receiver6.Receive());
- Assert.AreEqual(value2, receiver7.Receive());
- Assert.AreEqual(value2, receiver8.Receive());
- Assert.AreEqual(value2, receiver9.Receive());
-
- sender.Send(value3);
- Assert.AreEqual(value3, receiver1.Receive());
- Assert.AreEqual(value3, receiver2.Receive());
- Assert.AreEqual(value3, receiver3.Receive());
- Assert.AreEqual(value3, receiver4.Receive());
- Assert.AreEqual(value3, receiver5.Receive());
- Assert.AreEqual(value3, receiver6.Receive());
- Assert.AreEqual(value3, receiver7.Receive());
- Assert.AreEqual(value3, receiver8.Receive());
- Assert.AreEqual(value3, receiver9.Receive());
- }
-
-
- [TestMethod]
- public void TestBroadcastReduceOperators()
- {
- string groupName = "group1";
- string broadcastOperatorName = "broadcast";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 10;
- int fanOut = 3;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(
- broadcastOperatorName,
- masterTaskId,
- TopologyTypes.Tree)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction(),
- TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- //for master task
- IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver3 = commGroups[3].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver4 = commGroups[4].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver5 = commGroups[5].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender5 = commGroups[5].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver6 = commGroups[6].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender6 = commGroups[6].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver7 = commGroups[7].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender7 = commGroups[7].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver8 = commGroups[8].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender8 = commGroups[8].GetReduceSender<int>(reduceOperatorName);
-
- IBroadcastReceiver<int> broadcastReceiver9 = commGroups[9].GetBroadcastReceiver<int>(broadcastOperatorName);
- IReduceSender<int> triangleNumberSender9 = commGroups[9].GetReduceSender<int>(reduceOperatorName);
-
- for (int i = 1; i <= 10; i++)
- {
- broadcastSender.Send(i);
-
- int n1 = broadcastReceiver1.Receive();
- int n2 = broadcastReceiver2.Receive();
- int n3 = broadcastReceiver3.Receive();
- int n4 = broadcastReceiver4.Receive();
- int n5 = broadcastReceiver5.Receive();
- int n6 = broadcastReceiver6.Receive();
- int n7 = broadcastReceiver7.Receive();
- int n8 = broadcastReceiver8.Receive();
- int n9 = broadcastReceiver9.Receive();
- Assert.AreEqual(i, n1);
- Assert.AreEqual(i, n2);
- Assert.AreEqual(i, n3);
- Assert.AreEqual(i, n4);
- Assert.AreEqual(i, n5);
- Assert.AreEqual(i, n6);
- Assert.AreEqual(i, n7);
- Assert.AreEqual(i, n8);
- Assert.AreEqual(i, n9);
-
- int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9);
- triangleNumberSender9.Send(triangleNum9);
-
- int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8);
- triangleNumberSender8.Send(triangleNum8);
-
- int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7);
- triangleNumberSender7.Send(triangleNum7);
-
- int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6);
- triangleNumberSender6.Send(triangleNum6);
-
- int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5);
- triangleNumberSender5.Send(triangleNum5);
-
- int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4);
- triangleNumberSender4.Send(triangleNum4);
-
- int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3);
- triangleNumberSender3.Send(triangleNum3);
-
- int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2);
- triangleNumberSender2.Send(triangleNum2);
-
- int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1);
- triangleNumberSender1.Send(triangleNum1);
-
- int sum = sumReducer.Reduce();
- int expected = GroupCommunicationTests.TriangleNumber(i) * (numTasks - 1);
- Assert.AreEqual(sum, expected);
- }
- }
-
- [TestMethod]
- public void TestScatterOperator()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4 };
-
- sender.Send(data);
- var receved1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, receved1[0]);
- Assert.AreEqual(2, receved1[1]);
-
- var receved2 = receiver2.Receive().ToArray();
- Assert.AreEqual(3, receved2[0]);
- Assert.AreEqual(4, receved2[1]);
-
- Assert.AreEqual(1, receiver3.Receive().Single());
- Assert.AreEqual(2, receiver4.Receive().Single());
- }
-
- [TestMethod]
- public void TestScatterOperator2()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive();
- Assert.AreEqual(1, data3.First());
- Assert.AreEqual(2, data3.Last());
-
- var data4 = receiver4.Receive();
- Assert.AreEqual(3, data4.First());
- Assert.AreEqual(4, data4.Last());
- }
-
- [TestMethod]
- public void TestScatterOperator3()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
- Assert.AreEqual(3, data3[2]);
- Assert.AreEqual(4, data3[3]);
- }
-
- [TestMethod]
- public void TestScatterOperator4()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 4;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
- List<string> order = new List<string> { "task2", "task1" };
-
- sender.Send(data, order);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(1, data2[0]);
- Assert.AreEqual(2, data2[1]);
- Assert.AreEqual(3, data2[2]);
- Assert.AreEqual(4, data2[3]);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(5, data1[0]);
- Assert.AreEqual(6, data1[1]);
- Assert.AreEqual(7, data1[2]);
- Assert.AreEqual(8, data1[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(5, data3[0]);
- Assert.AreEqual(6, data3[1]);
- Assert.AreEqual(7, data3[2]);
- Assert.AreEqual(8, data3[3]);
- }
-
- [TestMethod]
- public void TestScatterOperator5()
- {
- string groupName = "group1";
- string operatorName = "scatter";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 6;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree)
- .Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
- IScatterReceiver<int> receiver5 = commGroups[5].GetScatterReceiver<int>(operatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
- Assert.IsNotNull(receiver5);
-
- List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- sender.Send(data);
-
- var data1 = receiver1.Receive().ToArray();
- Assert.AreEqual(1, data1[0]);
- Assert.AreEqual(2, data1[1]);
- Assert.AreEqual(3, data1[2]);
- Assert.AreEqual(4, data1[3]);
-
- var data2 = receiver2.Receive().ToArray();
- Assert.AreEqual(5, data2[0]);
- Assert.AreEqual(6, data2[1]);
- Assert.AreEqual(7, data2[2]);
- Assert.AreEqual(8, data2[3]);
-
- var data3 = receiver3.Receive().ToArray();
- Assert.AreEqual(1, data3[0]);
- Assert.AreEqual(2, data3[1]);
-
- var data4= receiver4.Receive().ToArray();
- Assert.AreEqual(3, data4[0]);
- Assert.AreEqual(4, data4[1]);
-
- var data5 = receiver5.Receive().ToArray();
- Assert.AreEqual(5, data5[0]);
- Assert.AreEqual(6, data5[1]);
- Assert.AreEqual(7, data5[2]);
- Assert.AreEqual(8, data5[3]);
- }
-
- [TestMethod]
- public void TestScatterReduceOperators()
- {
- string groupName = "group1";
- string scatterOperatorName = "scatter";
- string reduceOperatorName = "reduce";
- string masterTaskId = "task0";
- string driverId = "Driver Id";
- int numTasks = 5;
- int fanOut = 2;
-
- var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
-
- ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
- .AddScatter<int, IntCodec>(
- scatterOperatorName,
- masterTaskId,
- TopologyTypes.Tree)
- .AddReduce<int, IntCodec>(
- reduceOperatorName,
- masterTaskId,
- new SumFunction(),
- TopologyTypes.Tree).Build();
-
- var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup);
-
- IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
- IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
-
- IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
- IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
-
- Assert.IsNotNull(sender);
- Assert.IsNotNull(receiver1);
- Assert.IsNotNull(receiver2);
- Assert.IsNotNull(receiver3);
- Assert.IsNotNull(receiver4);
-
- List<int> data = Enumerable.Range(1, 100).ToList();
-
- sender.Send(data);
-
- List<int> data1 = receiver1.Receive();
- List<int> data2 = receiver2.Receive();
-
- List<int> data3 = receiver3.Receive();
- List<int> data4 = receiver4.Receive();
-
- int sum3 = data3.Sum();
- sumSender3.Send(sum3);
-
- int sum4 = data4.Sum();
- sumSender4.Send(sum4);
-
- int sum2 = data2.Sum();
- sumSender2.Send(sum2);
-
- int sum1 = data1.Sum();
- sumSender1.Send(sum1);
-
- int sum = sumReducer.Reduce();
- Assert.AreEqual(sum, 6325);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/987d4f37/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
deleted file mode 100644
index 927dfa7..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Network/NameServerTests.cs
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using System.Threading;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-
-namespace Org.Apache.REEF.Tests.Network
-{
- [TestClass]
- public class NameServerTests
- {
- [TestMethod]
- public void TestNameServerNoRequests()
- {
- using (var server = new NameServer(0))
- {
- }
- }
-
- [TestMethod]
- public void TestNameServerNoRequestsTwoClients()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestNameServerNoRequestsTwoClients2()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient2.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestNameServerMultipleRequestsTwoClients()
- {
- using (var server = new NameServer(0))
- {
- var nameClient = new NameClient(server.LocalEndpoint);
- var nameClient2 = new NameClient(server.LocalEndpoint);
- nameClient.Register("1", new IPEndPoint(IPAddress.Any, 8080));
- nameClient2.Lookup("1");
- }
- }
-
- [TestMethod]
- public void TestRegister()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
- IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
-
- // Check that no endpoints have been registered
- Assert.IsNull(client.Lookup("a"));
- Assert.IsNull(client.Lookup("b"));
- Assert.IsNull(client.Lookup("c"));
-
- // Register endpoints
- client.Register("a", endpoint1);
- client.Register("b", endpoint2);
- client.Register("c", endpoint3);
-
- // Check that they can be looked up correctly
- Assert.AreEqual(endpoint1, client.Lookup("a"));
- Assert.AreEqual(endpoint2, client.Lookup("b"));
- Assert.AreEqual(endpoint3, client.Lookup("c"));
- }
- }
- }
-
- [TestMethod]
- public void TestUnregister()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
-
- // Register endpoint
- client.Register("a", endpoint1);
-
- // Check that it can be looked up correctly
- Assert.AreEqual(endpoint1, client.Lookup("a"));
-
- // Unregister endpoints
- client.Unregister("a");
- Thread.Sleep(1000);
-
- // Make sure they were unregistered correctly
- Assert.IsNull(client.Lookup("a"));
- }
- }
- }
-
- [TestMethod]
- public void TestLookup()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
-
- // Register endpoint1
- client.Register("a", endpoint1);
- Assert.AreEqual(endpoint1, client.Lookup("a"));
-
- // Reregister identifer a
- client.Register("a", endpoint2);
- Assert.AreEqual(endpoint2, client.Lookup("a"));
- }
- }
- }
-
- [TestMethod]
- public void TestLookupList()
- {
- using (INameServer server = BuildNameServer())
- {
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint1 = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
- IPEndPoint endpoint2 = new IPEndPoint(IPAddress.Parse("100.0.0.2"), 200);
- IPEndPoint endpoint3 = new IPEndPoint(IPAddress.Parse("100.0.0.3"), 300);
-
- // Register endpoints
- client.Register("a", endpoint1);
- client.Register("b", endpoint2);
- client.Register("c", endpoint3);
-
- // Look up both at the same time
- List<string> ids = new List<string> { "a", "b", "c", "d" };
- List<NameAssignment> assignments = client.Lookup(ids);
-
- // Check that a, b, and c are registered
- Assert.AreEqual("a", assignments[0].Identifier);
- Assert.AreEqual(endpoint1, assignments[0].Endpoint);
- Assert.AreEqual("b", assignments[1].Identifier);
- Assert.AreEqual(endpoint2, assignments[1].Endpoint);
- Assert.AreEqual("c", assignments[2].Identifier);
- Assert.AreEqual(endpoint3, assignments[2].Endpoint);
-
- // Check that d is not registered
- Assert.AreEqual(3, assignments.Count);
- }
- }
- }
-
- [TestMethod]
- public void TestNameClientRestart()
- {
- int oldPort = 6666;
- int newPort = 6662;
- INameServer server = new NameServer(oldPort);
-
- using (INameClient client = BuildNameClient(server.LocalEndpoint))
- {
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("100.0.0.1"), 100);
-
- client.Register("a", endpoint);
- Assert.AreEqual(endpoint, client.Lookup("a"));
-
- server.Dispose();
-
- server = new NameServer(newPort);
- client.Restart(server.LocalEndpoint);
-
- client.Register("b", endpoint);
- Assert.AreEqual(endpoint, client.Lookup("b"));
-
- server.Dispose();
- }
- }
-
- [TestMethod]
- public void TestConstructorInjection()
- {
- int port = 6666;
- using (INameServer server = new NameServer(port))
- {
- IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
- .Set(NamingConfiguration.NameServerAddress, server.LocalEndpoint.Address.ToString())
- .Set(NamingConfiguration.NameServerPort, port + string.Empty)
- .Build();
-
- ConstructorInjection c = TangFactory.GetTang()
- .NewInjector(nameClientConfiguration)
- .GetInstance<ConstructorInjection>();
-
- Assert.IsNotNull(c);
- }
- }
-
- private INameServer BuildNameServer()
- {
- var builder = TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class, "0");
-
- return TangFactory.GetTang().NewInjector(builder.Build()).GetInstance<INameServer>();
- }
-
- private INameClient BuildNameClient(IPEndPoint remoteEndpoint)
- {
- string nameServerAddr = remoteEndpoint.Address.ToString();
- int nameServerPort = remoteEndpoint.Port;
- IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
- .Set(NamingConfiguration.NameServerAddress, nameServerAddr)
- .Set(NamingConfiguration.NameServerPort, nameServerPort + string.Empty)
- .Build();
-
- return TangFactory.GetTang().NewInjector(nameClientConfiguration).GetInstance<NameClient>();
- }
-
- private class ConstructorInjection
- {
- [Inject]
- public ConstructorInjection(NameClient client)
- {
- if (client == null)
- {
- throw new ArgumentNullException("client");
- }
- }
- }
- }
-}