You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by se...@apache.org on 2015/04/14 20:02:19 UTC
[2/3] incubator-reef git commit: [REEF-249] Adding Network.Examples
for Group Communication
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
new file mode 100644
index 0000000..f2b1c4a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
+{
+ public class SlaveTask : ITask
+ {
+ private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask));
+
+ private readonly IMpiClient _mpiClient;
+ private readonly ICommunicationGroupClient _commGroup;
+ private readonly IScatterReceiver<int> _scatterReceiver;
+ private readonly IReduceSender<int> _sumSender;
+
+ [Inject]
+ public SlaveTask(IMpiClient mpiClient)
+ {
+ _logger.Log(Level.Info, "Hello from slave task");
+
+ _mpiClient = mpiClient;
+ _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+ _scatterReceiver = _commGroup.GetScatterReceiver<int>(GroupTestConstants.ScatterOperatorName);
+ _sumSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName);
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ List<int> data = _scatterReceiver.Receive();
+ _logger.Log(Level.Info, "Received data: {0}", string.Join(" ", data));
+
+ int sum = data.Sum();
+ _logger.Log(Level.Info, "Sending back sum: {0}", sum);
+ _sumSender.Send(sum);
+
+ return null;
+ }
+
+ public void Dispose()
+ {
+ _mpiClient.Dispose();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
new file mode 100644
index 0000000..6914923
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <ProjectGuid>{B1B43B60-DDD0-4805-A9B4-BA84A0CCB7C7}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Network.Examples</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Network.Examples</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <RestorePackages>true</RestorePackages>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+ </PropertyGroup>
+ <PropertyGroup>
+ <StartupObject />
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\build.props" />
+ <PropertyGroup>
+ <BuildPackage>false</BuildPackage>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\BroadcastReduceDriver.cs" />
+ <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\MasterTask.cs" />
+ <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\SlaveTask.cs" />
+ <Compile Include="GroupCommunication\GroupTestConfig.cs" />
+ <Compile Include="GroupCommunication\GroupTestConstants.cs" />
+ <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedBroadcastReduceDriver.cs" />
+ <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedMasterTask.cs" />
+ <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedSlaveTask.cs" />
+ <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\MasterTask.cs" />
+ <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\ScatterReduceDriver.cs" />
+ <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\SlaveTask.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+ <Name>Org.Apache.REEF.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+ <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+ <Name>Org.Apache.REEF.Driver</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+ <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
+ <Name>Org.Apache.REEF.Network</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Org.Apache.REEF.Utilities</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..8fc8a33
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Network.Examples")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Network.Examples")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a3eafd9d-1be4-44a7-9f1c-9a81a1e59897")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf b/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf
deleted file mode 100644
index 67256f5..0000000
Binary files a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
new file mode 100644
index 0000000..94529c1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+ [TestClass]
+ public class BroadcastReduceTest : ReefFunctionalTest
+ {
+ [TestInitialize]
+ public void TestSetup()
+ {
+ CleanUp();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ CleanUp();
+ }
+
+ [TestMethod]
+ public void TestBroadcastAndReduceOnLocalRuntime()
+ {
+ int numTasks = 9;
+ TestBroadcastAndReduce(false, numTasks);
+ ValidateSuccessForLocalRuntime(numTasks);
+ }
+
+ [Ignore]
+ [TestMethod]
+ public void TestBroadcastAndReduceOnYarn()
+ {
+ int numTasks = 9;
+ TestBroadcastAndReduce(true, numTasks);
+ }
+
+ [TestMethod]
+ public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
+ {
+ IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ DriverBridgeConfiguration.ConfigurationModule
+ .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Build())
+ .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+ GenericType<GroupTestConfig.NumIterations>.Class,
+ GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+ GenericType<GroupTestConfig.NumEvaluators>.Class,
+ numTasks.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+ .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+ .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+ .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+ .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+
+ HashSet<string> appDlls = new HashSet<string>();
+ appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+ appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+ appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+ TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
new file mode 100644
index 0000000..0c918b5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+ [TestClass]
+ public class PipelinedBroadcastReduceTest : ReefFunctionalTest
+ {
+ [TestInitialize]
+ public void TestSetup()
+ {
+ CleanUp();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ CleanUp();
+ }
+
+ [TestMethod]
+ public void TestPipelinedBroadcastAndReduceOnLocalRuntime()
+ {
+ const int numTasks = 9;
+ TestBroadcastAndReduce(false, numTasks);
+ ValidateSuccessForLocalRuntime(numTasks);
+ }
+
+ [Ignore]
+ [TestMethod]
+ public void TestPipelinedBroadcastAndReduceOnYarn()
+ {
+ const int numTasks = 9;
+ TestBroadcastAndReduce(true, numTasks);
+ }
+
+ public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
+ {
+ IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ DriverBridgeConfiguration.ConfigurationModule
+ .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Build())
+ .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+ GenericType<GroupTestConfig.NumIterations>.Class,
+ GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+ GenericType<GroupTestConfig.NumEvaluators>.Class,
+ numTasks.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+ GenericType<GroupTestConfig.ChunkSize>.Class,
+ GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+ .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+ .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+ .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+ .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+
+ HashSet<string> appDlls = new HashSet<string>();
+ appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+ appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+ appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+ TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
new file mode 100644
index 0000000..083261a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+ [TestClass]
+ public class ScatterReduceTest : ReefFunctionalTest
+ {
+ [TestInitialize]
+ public void TestSetup()
+ {
+ CleanUp();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ CleanUp();
+ }
+
+ [TestMethod]
+ public void TestScatterAndReduceOnLocalRuntime()
+ {
+ int numTasks = 5;
+ TestScatterAndReduce(false, numTasks);
+ ValidateSuccessForLocalRuntime(numTasks);
+ }
+
+ [Ignore]
+ [TestMethod]
+ public void TestScatterAndReduceOnYarn()
+ {
+ int numTasks = 5;
+ TestScatterAndReduce(true, numTasks);
+ }
+
+ [TestMethod]
+ public void TestScatterAndReduce(bool runOnYarn, int numTasks)
+ {
+ IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ DriverBridgeConfiguration.ConfigurationModule
+ .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Build())
+ .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+ GenericType<GroupTestConfig.NumEvaluators>.Class,
+ numTasks.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+ .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+ .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+ .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+ .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString())
+ .Build();
+
+ IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+
+ HashSet<string> appDlls = new HashSet<string>();
+ appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+ appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name);
+ appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+ appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+ TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
deleted file mode 100644
index 8cc32b8..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.Linq;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
- public class BroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(BroadcastReduceDriver));
-
- private readonly int _numEvaluators;
- private readonly int _numIterations;
-
- private readonly IMpiDriver _mpiDriver;
- private readonly ICommunicationGroupDriver _commGroup;
- private readonly TaskStarter _mpiTaskStarter;
-
- [Inject]
- public BroadcastReduceDriver(
- [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
- [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations,
- MpiDriver mpiDriver)
- {
- Identifier = "BroadcastStartHandler";
- _numEvaluators = numEvaluators;
- _numIterations = numIterations;
- _mpiDriver = mpiDriver;
- _commGroup = _mpiDriver.DefaultGroup
- .AddBroadcast<int, IntCodec>(
- MpiTestConstants.BroadcastOperatorName,
- MpiTestConstants.MasterTaskId)
- .AddReduce<int, IntCodec>(
- MpiTestConstants.ReduceOperatorName,
- MpiTestConstants.MasterTaskId,
- new SumFunction())
- .Build();
-
- _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
-
- CreateClassHierarchy();
- }
-
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
-
- public void OnNext(IAllocatedEvaluator allocatedEvaluator)
- {
- IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
- IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
- allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
- }
-
- public void OnNext(IActiveContext activeContext)
- {
- if (_mpiDriver.IsMasterTaskContext(activeContext))
- {
- // Configure Master Task
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId)
- .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class)
- .Build())
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- _numEvaluators.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- _numIterations.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- _commGroup.AddTask(MpiTestConstants.MasterTaskId);
- _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
- }
- else
- {
- // Configure Slave Task
- string slaveTaskId = "SlaveTask-" + activeContext.Id;
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, slaveTaskId)
- .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class)
- .Build())
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- _numEvaluators.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- _numIterations.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- _commGroup.AddTask(slaveTaskId);
- _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
- }
- }
-
- public void OnNext(IFailedEvaluator value)
- {
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
-
- private void CreateClassHierarchy()
- {
- HashSet<string> clrDlls = new HashSet<string>();
- clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
- clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
- clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
- clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
- clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
- ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
- }
-
- private class SumFunction : IReduceFunction<int>
- {
- [Inject]
- public SumFunction()
- {
- }
-
- public int Reduce(IEnumerable<int> elements)
- {
- return elements.Sum();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
deleted file mode 100644
index fea51de..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Globalization;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
- [TestClass]
- public class BroadcastReduceTest : ReefFunctionalTest
- {
- [TestInitialize]
- public void TestSetup()
- {
- CleanUp();
- }
-
- [TestCleanup]
- public void TestCleanup()
- {
- CleanUp();
- }
-
- [TestMethod]
- public void TestBroadcastAndReduceOnLocalRuntime()
- {
- int numTasks = 9;
- TestBroadcastAndReduce(false, numTasks);
- ValidateSuccessForLocalRuntime(numTasks);
- }
-
- [Ignore]
- [TestMethod]
- public void TestBroadcastAndReduceOnYarn()
- {
- int numTasks = 9;
- TestBroadcastAndReduce(true, numTasks);
- }
-
- [TestMethod]
- public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
- {
- IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
- .Build())
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- numTasks.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
- .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId)
- .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId)
- .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName)
- .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
- .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
-
- HashSet<string> appDlls = new HashSet<string>();
- appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
- appDlls.Add(typeof(ITask).Assembly.GetName().Name);
- appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
- appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
- appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
- TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
deleted file mode 100644
index fd3084d..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
- public class MasterTask : ITask
- {
- private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask));
-
- private readonly int _numIters;
- private readonly int _numReduceSenders;
-
- private readonly IMpiClient _mpiClient;
- private readonly ICommunicationGroupClient _commGroup;
- private readonly IBroadcastSender<int> _broadcastSender;
- private readonly IReduceReceiver<int> _sumReducer;
-
- [Inject]
- public MasterTask(
- [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
- [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
- IMpiClient mpiClient)
- {
- _logger.Log(Level.Info, "Hello from master task");
- _numIters = numIters;
- _numReduceSenders = numEvaluators - 1;
- _mpiClient = mpiClient;
-
- _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
- _broadcastSender = _commGroup.GetBroadcastSender<int>(MpiTestConstants.BroadcastOperatorName);
- _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
- }
-
- public byte[] Call(byte[] memento)
- {
- for (int i = 1; i <= _numIters; i++)
- {
- // Each slave task calculates the nth triangle number
- _broadcastSender.Send(i);
-
- // Sum up all of the calculated triangle numbers
- int sum = _sumReducer.Reduce();
- _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
-
- int expected = TriangleNumber(i) * _numReduceSenders;
- if (sum != TriangleNumber(i) * _numReduceSenders)
- {
- throw new Exception("Expected " + expected + " but got " + sum);
- }
- }
-
- return null;
- }
-
- public void Dispose()
- {
- _mpiClient.Dispose();
- }
-
- private int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
deleted file mode 100644
index 08fbde4..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
- public class SlaveTask : ITask
- {
- private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask));
-
- private readonly int _numIterations;
- private readonly IMpiClient _mpiClient;
- private readonly ICommunicationGroupClient _commGroup;
- private readonly IBroadcastReceiver<int> _broadcastReceiver;
- private readonly IReduceSender<int> _triangleNumberSender;
-
- [Inject]
- public SlaveTask(
- [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
- IMpiClient mpiClient)
- {
- _logger.Log(Level.Info, "Hello from slave task");
-
- _numIterations = numIters;
- _mpiClient = mpiClient;
- _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
- _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(MpiTestConstants.BroadcastOperatorName);
- _triangleNumberSender = _commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName);
- }
-
- public byte[] Call(byte[] memento)
- {
- for (int i = 0; i < _numIterations; i++)
- {
- // Receive n from Master Task
- int n = _broadcastReceiver.Receive();
- _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n);
-
- // Calculate the nth Triangle number and send it back to driver
- int triangleNum = TriangleNumber(n);
- _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i);
- _triangleNumberSender.Send(triangleNum);
- }
-
- return null;
- }
-
- public void Dispose()
- {
- _mpiClient.Dispose();
- }
-
- private int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
deleted file mode 100644
index a3989a0..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI
-{
- public class MpiTestConfig
- {
- [NamedParameter("Number of iterations of messages to send")]
- public class NumIterations : Name<int>
- {
- }
-
- [NamedParameter("Number of Evaluators")]
- public class NumEvaluators : Name<int>
- {
- }
-
- [NamedParameter("tree width")]
- public class FanOut : Name<int>
- {
- }
-
- [NamedParameter("integer id of the evaluator")]
- public class EvaluatorId : Name<string>
- {
- }
-
- [NamedParameter("Size of the array")]
- public class ArraySize : Name<int>
- {
- }
-
- [NamedParameter("Chunk size for pipelining")]
- public class ChunkSize : Name<int>
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
deleted file mode 100644
index 668add3..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace Org.Apache.REEF.Tests.Functional.MPI
-{
- internal class MpiTestConstants
- {
- public const string DriverId = "BroadcastReduceDriver";
- public const string GroupName = "BroadcastReduceGroup";
- public const string BroadcastOperatorName = "Broadcast";
- public const string ReduceOperatorName = "Reduce";
- public const string ScatterOperatorName = "Scatter";
- public const string MasterTaskId = "MasterTask";
- public const string SlaveTaskId = "SlaveTask-";
- public const int NumIterations = 10;
- public const int FanOut = 2;
- public const int ChunkSize = 2;
- public const int ArrayLength = 6;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
deleted file mode 100644
index 98a68dd..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.Linq;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Topology;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
- public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
-
- private readonly int _numEvaluators;
- private readonly int _numIterations;
- private readonly int _chunkSize;
-
- private readonly IMpiDriver _mpiDriver;
- private readonly ICommunicationGroupDriver _commGroup;
- private readonly TaskStarter _mpiTaskStarter;
-
- [Inject]
- public PipelinedBroadcastReduceDriver(
- [Parameter(typeof (MpiTestConfig.NumEvaluators))] int numEvaluators,
- [Parameter(typeof (MpiTestConfig.NumIterations))] int numIterations,
- [Parameter(typeof (MpiTestConfig.ChunkSize))] int chunkSize,
- MpiDriver mpiDriver)
- {
- Logger.Log(Level.Info, "*******entering the driver code " + chunkSize);
-
- Identifier = "BroadcastStartHandler";
- _numEvaluators = numEvaluators;
- _numIterations = numIterations;
- _chunkSize = chunkSize;
-
- _mpiDriver = mpiDriver;
-
- _commGroup = _mpiDriver.DefaultGroup
- .AddBroadcast<int[], IntArrayCodec>(
- MpiTestConstants.BroadcastOperatorName,
- MpiTestConstants.MasterTaskId,
- TopologyTypes.Tree,
- new PipelineIntDataConverter(_chunkSize))
- .AddReduce<int[], IntArrayCodec>(
- MpiTestConstants.ReduceOperatorName,
- MpiTestConstants.MasterTaskId,
- new ArraySumFunction(),
- TopologyTypes.Tree,
- new PipelineIntDataConverter(_chunkSize))
- .Build();
-
- _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
-
- CreateClassHierarchy();
- }
-
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
-
- public void OnNext(IAllocatedEvaluator allocatedEvaluator)
- {
- IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
- IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
- allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
- }
-
- public void OnNext(IActiveContext activeContext)
- {
- if (_mpiDriver.IsMasterTaskContext(activeContext))
- {
- Logger.Log(Level.Info, "******* Master ID " + activeContext.Id );
-
- // Configure Master Task
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId)
- .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class)
- .Build())
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- _numEvaluators.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- _numIterations.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.ArraySize, int>(
- GenericType<MpiTestConfig.ArraySize>.Class,
- MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- _commGroup.AddTask(MpiTestConstants.MasterTaskId);
- _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
- }
- else
- {
- // Configure Slave Task
- string slaveTaskId = "SlaveTask-" + activeContext.Id;
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
- TaskConfiguration.ConfigurationModule
- .Set(TaskConfiguration.Identifier, slaveTaskId)
- .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class)
- .Build())
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- _numEvaluators.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- _numIterations.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.ArraySize, int>(
- GenericType<MpiTestConfig.ArraySize>.Class,
- MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- _commGroup.AddTask(slaveTaskId);
- _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
- }
- }
-
- public void OnNext(IFailedEvaluator value)
- {
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
-
- private void CreateClassHierarchy()
- {
- HashSet<string> clrDlls = new HashSet<string>();
- clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
- clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
- clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
- clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
- clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
- ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
- }
-
- private class SumFunction : IReduceFunction<int>
- {
- [Inject]
- public SumFunction()
- {
- }
-
- public int Reduce(IEnumerable<int> elements)
- {
- return elements.Sum();
- }
- }
-
- private class ArraySumFunction : IReduceFunction<int[]>
- {
- [Inject]
- public ArraySumFunction()
- {
- }
-
- public int[] Reduce(IEnumerable<int[]> elements)
- {
- int[] result = null;
- int count = 0;
-
- foreach (var element in elements)
- {
- if (count == 0)
- {
- result = element.Clone() as int[];
- }
- else
- {
- if (element.Length != result.Length)
- {
- throw new Exception("integer arrays are of different sizes");
- }
-
- for (int i = 0; i < result.Length; i++)
- {
- result[i] += element[i];
- }
- }
-
- count++;
- }
-
- return result;
- }
- }
-
-
- private class IntArrayCodec : ICodec<int[]>
- {
- [Inject]
- public IntArrayCodec()
- {
- }
-
- public byte[] Encode(int[] obj)
- {
- byte[] result = new byte[sizeof(Int32) * obj.Length];
- Buffer.BlockCopy(obj, 0, result, 0, result.Length);
- return result;
- }
-
- public int[] Decode(byte[] data)
- {
- if (data.Length % sizeof(Int32) != 0)
- {
- throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size");
- }
-
- int[] result = new int[data.Length / sizeof(Int32)];
- Buffer.BlockCopy(data, 0, result, 0, data.Length);
- return result;
- }
- }
-
- public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
- {
- readonly int _chunkSize;
-
- [Inject]
- public PipelineIntDataConverter([Parameter(typeof(MpiTestConfig.ChunkSize))] int chunkSize)
- {
- _chunkSize = chunkSize;
- }
-
- public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
- {
- List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>();
- int totalChunks = message.Length / _chunkSize;
-
- if (message.Length % _chunkSize != 0)
- {
- totalChunks++;
- }
-
- int counter = 0;
- for (int i = 0; i < message.Length; i += _chunkSize)
- {
- int[] data = new int[Math.Min(_chunkSize, message.Length - i)];
- Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int));
-
- messageList.Add(counter == totalChunks - 1
- ? new PipelineMessage<int[]>(data, true)
- : new PipelineMessage<int[]>(data, false));
-
- counter++;
- }
-
- return messageList;
- }
-
- public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage)
- {
- int size = pipelineMessage.Select(x => x.Data.Length).Sum();
- int[] data = new int[size];
- int offset = 0;
-
- foreach (var message in pipelineMessage)
- {
- Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int));
- offset += message.Data.Length * sizeof(int);
- }
-
- return data;
- }
-
- public IConfiguration GetConfiguration()
- {
- return TangFactory.GetTang().NewConfigurationBuilder()
- .BindNamedParameter<MpiTestConfig.ChunkSize, int>(GenericType<MpiTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture))
- .Build();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
deleted file mode 100644
index 05d0b9b..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Globalization;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
- [TestClass]
- public class PipelinedBroadcastReduceTest : ReefFunctionalTest
- {
- [TestInitialize]
- public void TestSetup()
- {
- CleanUp();
- }
-
- [TestCleanup]
- public void TestCleanup()
- {
- CleanUp();
- }
-
- [TestMethod]
- public void TestPipelinedBroadcastAndReduceOnLocalRuntime()
- {
- const int numTasks = 9;
- TestBroadcastAndReduce(false, numTasks);
- ValidateSuccessForLocalRuntime(numTasks);
- }
-
- [Ignore]
- [TestMethod]
- public void TestPipelinedBroadcastAndReduceOnYarn()
- {
- const int numTasks = 9;
- TestBroadcastAndReduce(true, numTasks);
- }
-
- public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
- {
- IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
- .Build())
- .BindNamedParameter<MpiTestConfig.NumIterations, int>(
- GenericType<MpiTestConfig.NumIterations>.Class,
- MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
- GenericType<MpiTestConfig.NumEvaluators>.Class,
- numTasks.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<MpiTestConfig.ChunkSize, int>(
- GenericType<MpiTestConfig.ChunkSize>.Class,
- MpiTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
- .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId)
- .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId)
- .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName)
- .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
- .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
- .Build();
-
- IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
-
- HashSet<string> appDlls = new HashSet<string>();
- appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
- appDlls.Add(typeof(ITask).Assembly.GetName().Name);
- appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
- appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
- appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
- TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
deleted file mode 100644
index 922f294..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
- public class PipelinedMasterTask : ITask
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedMasterTask));
-
- private readonly int _numIters;
- private readonly int _numReduceSenders;
- private readonly int _arraySize;
-
- private readonly IMpiClient _mpiClient;
- private readonly ICommunicationGroupClient _commGroup;
- private readonly IBroadcastSender<int[]> _broadcastSender;
- private readonly IReduceReceiver<int[]> _sumReducer;
-
- [Inject]
- public PipelinedMasterTask(
- [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
- [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
- [Parameter(typeof(MpiTestConfig.ArraySize))] int arraySize,
- IMpiClient mpiClient)
- {
- Logger.Log(Level.Info, "Hello from master task");
- _numIters = numIters;
- _numReduceSenders = numEvaluators - 1;
- _arraySize = arraySize;
- _mpiClient = mpiClient;
-
- _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
- _broadcastSender = _commGroup.GetBroadcastSender<int[]>(MpiTestConstants.BroadcastOperatorName);
- _sumReducer = _commGroup.GetReduceReceiver<int[]>(MpiTestConstants.ReduceOperatorName);
- Logger.Log(Level.Info, "finished master task constructor");
- }
-
- public byte[] Call(byte[] memento)
- {
- int[] intArr = new int[_arraySize];
-
- for (int i = 1; i <= _numIters; i++)
- {
- for (int j = 0; j < _arraySize; j++)
- {
- intArr[j] = i;
- }
-
- _broadcastSender.Send(intArr);
- int[] sum = _sumReducer.Reduce();
-
- Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
-
- int expected = TriangleNumber(i) * _numReduceSenders;
-
- for (int j = 0; j < intArr.Length; j++)
- {
- if (sum[j] != TriangleNumber(i) * _numReduceSenders)
- {
- throw new Exception("Expected " + expected + " but got " + sum);
- }
- }
- }
-
- return null;
- }
-
- public void Dispose()
- {
- _mpiClient.Dispose();
- }
-
- private int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
deleted file mode 100644
index 5455121..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
- public class PipelinedSlaveTask : ITask
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask));
-
- private readonly int _numIterations;
- private readonly IMpiClient _mpiClient;
- private readonly ICommunicationGroupClient _commGroup;
- private readonly IBroadcastReceiver<int[]> _broadcastReceiver;
- private readonly IReduceSender<int[]> _triangleNumberSender;
-
- [Inject]
- public PipelinedSlaveTask(
- [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
- IMpiClient mpiClient)
- {
- Logger.Log(Level.Info, "Hello from slave task");
-
- _numIterations = numIters;
- _mpiClient = mpiClient;
- _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
- _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(MpiTestConstants.BroadcastOperatorName);
- _triangleNumberSender = _commGroup.GetReduceSender<int[]>(MpiTestConstants.ReduceOperatorName);
- }
-
- public byte[] Call(byte[] memento)
- {
- for (int i = 0; i < _numIterations; i++)
- {
- // Receive n from Master Task
- int[] intVec = _broadcastReceiver.Receive();
-
- Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]);
-
- // Calculate the nth Triangle number and send it back to driver
- int triangleNum = TriangleNumber(intVec[0]);
- Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i);
-
- int[] resArr = new int[intVec.Length];
-
- for (int j = 0; j < resArr.Length; j++)
- {
- resArr[j] = triangleNum;
- }
-
- _triangleNumberSender.Send(resArr);
- }
-
- return null;
- }
-
- public void Dispose()
- {
- _mpiClient.Dispose();
- }
-
- private int TriangleNumber(int n)
- {
- return Enumerable.Range(1, n).Sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
deleted file mode 100644
index e563534..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
-{
- public class MasterTask : ITask
- {
- private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask));
-
- private readonly IMpiClient _mpiClient;
- private readonly ICommunicationGroupClient _commGroup;
- private readonly IScatterSender<int> _scatterSender;
- private readonly IReduceReceiver<int> _sumReducer;
-
- [Inject]
- public MasterTask(IMpiClient mpiClient)
- {
- _logger.Log(Level.Info, "Hello from master task");
- _mpiClient = mpiClient;
-
- _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
- _scatterSender = _commGroup.GetScatterSender<int>(MpiTestConstants.ScatterOperatorName);
- _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
- }
-
- public byte[] Call(byte[] memento)
- {
- List<int> data = Enumerable.Range(1, 100).ToList();
- _scatterSender.Send(data);
-
- int sum = _sumReducer.Reduce();
- _logger.Log(Level.Info, "Received sum: {0}", sum);
-
- return null;
- }
-
- public void Dispose()
- {
- _mpiClient.Dispose();
- }
-
- private List<string> GetScatterOrder()
- {
- return new List<string> { "SlaveTask-4", "SlaveTask-3", "SlaveTask-2", "SlaveTask-1" };
- }
- }
-}