You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by se...@apache.org on 2015/04/14 20:02:19 UTC

[2/3] incubator-reef git commit: [REEF-249] Adding Network.Examples for Group Communication

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
new file mode 100644
index 0000000..f2b1c4a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
+{
+    public class SlaveTask : ITask
+    {
+        private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask));
+
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IScatterReceiver<int> _scatterReceiver;
+        private readonly IReduceSender<int> _sumSender;
+
+        [Inject]
+        public SlaveTask(IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from slave task");
+
+            _mpiClient = mpiClient;
+            _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _scatterReceiver = _commGroup.GetScatterReceiver<int>(GroupTestConstants.ScatterOperatorName);
+            _sumSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            List<int> data = _scatterReceiver.Receive();
+            _logger.Log(Level.Info, "Received data: {0}", string.Join(" ", data));
+
+            int sum = data.Sum();
+            _logger.Log(Level.Info, "Sending back sum: {0}", sum);
+            _sumSender.Send(sum);
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
new file mode 100644
index 0000000..6914923
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/Org.Apache.REEF.Network.Examples.csproj
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+  <PropertyGroup>
+    <ProjectGuid>{B1B43B60-DDD0-4805-A9B4-BA84A0CCB7C7}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.REEF.Network.Examples</RootNamespace>
+    <AssemblyName>Org.Apache.REEF.Network.Examples</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <RestorePackages>true</RestorePackages>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir>
+  </PropertyGroup>
+  <PropertyGroup>
+    <StartupObject />
+  </PropertyGroup>
+  <Import Project="$(SolutionDir)\build.props" />
+  <PropertyGroup>
+    <BuildPackage>false</BuildPackage>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\BroadcastReduceDriver.cs" />
+    <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\MasterTask.cs" />
+    <Compile Include="GroupCommunication\BroadcastReduceDriverAndTasks\SlaveTask.cs" />
+    <Compile Include="GroupCommunication\GroupTestConfig.cs" />
+    <Compile Include="GroupCommunication\GroupTestConstants.cs" />
+    <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedBroadcastReduceDriver.cs" />
+    <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedMasterTask.cs" />
+    <Compile Include="GroupCommunication\PipelineBroadcastReduceDriverAndTasks\PipelinedSlaveTask.cs" />
+    <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\MasterTask.cs" />
+    <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\ScatterReduceDriver.cs" />
+    <Compile Include="GroupCommunication\ScatterReduceDriverAndTasks\SlaveTask.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+      <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+      <Name>Org.Apache.REEF.Common</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+      <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+      <Name>Org.Apache.REEF.Driver</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+      <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
+      <Name>Org.Apache.REEF.Network</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Org.Apache.REEF.Tang</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+      <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+      <Name>Org.Apache.REEF.Wake</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..8fc8a33
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/Properties/AssemblyInfo.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Network.Examples")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Network.Examples")]
+[assembly: AssemblyCopyright("Copyright ©  2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a3eafd9d-1be4-44a7-9f1c-9a81a1e59897")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf b/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf
deleted file mode 100644
index 67256f5..0000000
Binary files a/lang/cs/Org.Apache.REEF.Tests/ConfigFiles/evaluator.conf and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
new file mode 100644
index 0000000..94529c1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+    [TestClass]
+    public class BroadcastReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestBroadcastAndReduceOnLocalRuntime()
+        {
+            int numTasks = 9;
+            TestBroadcastAndReduce(false, numTasks);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+
+        [Ignore]
+        [TestMethod]
+        public void TestBroadcastAndReduceOnYarn()
+        {
+            int numTasks = 9;
+            TestBroadcastAndReduce(true, numTasks);
+        }
+
+        [TestMethod]
+        public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
+        {
+            IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                    GenericType<GroupTestConfig.NumIterations>.Class,
+                    GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                    GenericType<GroupTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+                .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+                .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+                .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+                .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+                    
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
new file mode 100644
index 0000000..0c918b5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+    [TestClass]
+    public class PipelinedBroadcastReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestPipelinedBroadcastAndReduceOnLocalRuntime()
+        {
+            const int numTasks = 9;
+            TestBroadcastAndReduce(false, numTasks);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+
+        [Ignore]
+        [TestMethod]
+        public void TestPipelinedBroadcastAndReduceOnYarn()
+        {
+            const int numTasks = 9;
+            TestBroadcastAndReduce(true, numTasks);
+        }
+
+        public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
+        {
+            IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                    GenericType<GroupTestConfig.NumIterations>.Class,
+                    GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                    GenericType<GroupTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                 .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+                    GenericType<GroupTestConfig.ChunkSize>.Class,
+                    GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+                .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+                .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+                .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+                .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+                    
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
new file mode 100644
index 0000000..083261a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Group
+{
+    [TestClass]
+    public class ScatterReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestScatterAndReduceOnLocalRuntime()
+        {
+            int numTasks = 5;
+            TestScatterAndReduce(false, numTasks);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+
+        [Ignore]
+        [TestMethod]
+        public void TestScatterAndReduceOnYarn()
+        {
+            int numTasks = 5;
+            TestScatterAndReduce(true, numTasks);
+        }
+
+        [TestMethod]
+        public void TestScatterAndReduce(bool runOnYarn, int numTasks)
+        {
+            IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                    GenericType<GroupTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
+               .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+               .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+               .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+               .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+               .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString())
+               .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
+
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
deleted file mode 100644
index 8cc32b8..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.Linq;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
-    public class BroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(BroadcastReduceDriver));
-
-        private readonly int _numEvaluators;
-        private readonly int _numIterations;
-
-        private readonly IMpiDriver _mpiDriver;
-        private readonly ICommunicationGroupDriver _commGroup;
-        private readonly TaskStarter _mpiTaskStarter;
-
-        [Inject]
-        public BroadcastReduceDriver(
-            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
-            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations,
-            MpiDriver mpiDriver)
-        {
-            Identifier = "BroadcastStartHandler";
-            _numEvaluators = numEvaluators;
-            _numIterations = numIterations;
-            _mpiDriver = mpiDriver;
-            _commGroup = _mpiDriver.DefaultGroup
-                    .AddBroadcast<int, IntCodec>(
-                        MpiTestConstants.BroadcastOperatorName,
-                       MpiTestConstants.MasterTaskId)
-                    .AddReduce<int, IntCodec>(
-                        MpiTestConstants.ReduceOperatorName,
-                            MpiTestConstants.MasterTaskId,
-                            new SumFunction())
-                    .Build();
-
-            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
-
-            CreateClassHierarchy();
-        }
-
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
-        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
-        {
-            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
-            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
-            allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
-        }
-
-        public void OnNext(IActiveContext activeContext)
-        {
-            if (_mpiDriver.IsMasterTaskContext(activeContext))
-            {
-                // Configure Master Task
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId)
-                        .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class)
-                        .Build())
-                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                        GenericType<MpiTestConfig.NumEvaluators>.Class,
-                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                        GenericType<MpiTestConfig.NumIterations>.Class,
-                        _numIterations.ToString(CultureInfo.InvariantCulture))
-                    .Build();
-
-                _commGroup.AddTask(MpiTestConstants.MasterTaskId);
-                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
-            }
-            else
-            {
-                // Configure Slave Task
-                string slaveTaskId = "SlaveTask-" + activeContext.Id;
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, slaveTaskId)
-                        .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class)
-                        .Build())
-                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                        GenericType<MpiTestConfig.NumEvaluators>.Class,
-                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                        GenericType<MpiTestConfig.NumIterations>.Class,
-                        _numIterations.ToString(CultureInfo.InvariantCulture))
-                    .Build();
-
-                _commGroup.AddTask(slaveTaskId);
-                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
-            }
-        }
-
-        public void OnNext(IFailedEvaluator value)
-        {
-        }
-
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnCompleted()
-        {
-        }
-
-        private void CreateClassHierarchy()
-        {
-            HashSet<string> clrDlls = new HashSet<string>();
-            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
-            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
-            clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
-            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
-            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
-            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
-        }
-
-        private class SumFunction : IReduceFunction<int>
-        {
-            [Inject]
-            public SumFunction()
-            {
-            }
-
-            public int Reduce(IEnumerable<int> elements)
-            {
-                return elements.Sum();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
deleted file mode 100644
index fea51de..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Globalization;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
-    [TestClass]
-    public class BroadcastReduceTest : ReefFunctionalTest
-    {
-        [TestInitialize]
-        public void TestSetup()
-        {
-            CleanUp();
-        }
-
-        [TestCleanup]
-        public void TestCleanup()
-        {
-            CleanUp();
-        }
-
-        [TestMethod]
-        public void TestBroadcastAndReduceOnLocalRuntime()
-        {
-            int numTasks = 9;
-            TestBroadcastAndReduce(false, numTasks);
-            ValidateSuccessForLocalRuntime(numTasks);
-        }
-
-        [Ignore]
-        [TestMethod]
-        public void TestBroadcastAndReduceOnYarn()
-        {
-            int numTasks = 9;
-            TestBroadcastAndReduce(true, numTasks);
-        }
-
-        [TestMethod]
-        public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
-        {
-            IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
-                    .Build())
-                .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                    GenericType<MpiTestConfig.NumIterations>.Class,
-                    MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
-                .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                    GenericType<MpiTestConfig.NumEvaluators>.Class,
-                    numTasks.ToString(CultureInfo.InvariantCulture))
-                .Build();
-
-            IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId)
-                .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId)
-                .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName)
-                .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
-                .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
-                .Build();
-
-            IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
-                    
-            HashSet<string> appDlls = new HashSet<string>();
-            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
-            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
-            appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
-            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
-            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
-            TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
deleted file mode 100644
index fd3084d..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
-    public class MasterTask : ITask
-    {
-        private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask));
-
-        private readonly int _numIters;
-        private readonly int _numReduceSenders;
-
-        private readonly IMpiClient _mpiClient;
-        private readonly ICommunicationGroupClient _commGroup;
-        private readonly IBroadcastSender<int> _broadcastSender;
-        private readonly IReduceReceiver<int> _sumReducer;
-
-        [Inject]
-        public MasterTask(
-            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
-            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
-            IMpiClient mpiClient)
-        {
-            _logger.Log(Level.Info, "Hello from master task");
-            _numIters = numIters;
-            _numReduceSenders = numEvaluators - 1;
-            _mpiClient = mpiClient;
-
-            _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
-            _broadcastSender = _commGroup.GetBroadcastSender<int>(MpiTestConstants.BroadcastOperatorName);
-            _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
-        }
-
-        public byte[] Call(byte[] memento)
-        {
-            for (int i = 1; i <= _numIters; i++)
-            {
-                // Each slave task calculates the nth triangle number
-                _broadcastSender.Send(i);
-                
-                // Sum up all of the calculated triangle numbers
-                int sum = _sumReducer.Reduce();
-                _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
-
-                int expected = TriangleNumber(i) * _numReduceSenders;
-                if (sum != TriangleNumber(i) * _numReduceSenders)
-                {
-                    throw new Exception("Expected " + expected + " but got " + sum);
-                }
-            }
-
-            return null;
-        }
-
-        public void Dispose()
-        {
-            _mpiClient.Dispose();
-        }
-
-        private int TriangleNumber(int n)
-        {
-            return Enumerable.Range(1, n).Sum();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
deleted file mode 100644
index 08fbde4..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
-{
-    public class SlaveTask : ITask
-    {
-        private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask));
-
-        private readonly int _numIterations;
-        private readonly IMpiClient _mpiClient;
-        private readonly ICommunicationGroupClient _commGroup;
-        private readonly IBroadcastReceiver<int> _broadcastReceiver;
-        private readonly IReduceSender<int> _triangleNumberSender;
-
-        [Inject]
-        public SlaveTask(
-            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
-            IMpiClient mpiClient)
-        {
-            _logger.Log(Level.Info, "Hello from slave task");
-
-            _numIterations = numIters;
-            _mpiClient = mpiClient;
-            _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
-            _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(MpiTestConstants.BroadcastOperatorName);
-            _triangleNumberSender = _commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName);
-        }
-
-        public byte[] Call(byte[] memento)
-        {
-            for (int i = 0; i < _numIterations; i++)
-            {
-                // Receive n from Master Task
-                int n = _broadcastReceiver.Receive();
-                _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n);
-
-                // Calculate the nth Triangle number and send it back to driver
-                int triangleNum = TriangleNumber(n);
-                _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i);
-                _triangleNumberSender.Send(triangleNum);
-            }
-
-            return null;
-        }
-
-        public void Dispose()
-        {
-            _mpiClient.Dispose();
-        }
-
-        private int TriangleNumber(int n)
-        {
-            return Enumerable.Range(1, n).Sum();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
deleted file mode 100644
index a3989a0..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI
-{
-    public class MpiTestConfig
-    {
-        [NamedParameter("Number of iterations of messages to send")]
-        public class NumIterations : Name<int>
-        {
-        }
-
-        [NamedParameter("Number of Evaluators")]
-        public class NumEvaluators : Name<int>
-        {
-        }
-
-        [NamedParameter("tree width")]
-        public class FanOut : Name<int>
-        {
-        }
-
-        [NamedParameter("integer id of the evaluator")]
-        public class EvaluatorId : Name<string>
-        {
-        }
-
-        [NamedParameter("Size of the array")]
-        public class ArraySize : Name<int>
-        {
-        }
-
-        [NamedParameter("Chunk size for pipelining")]
-        public class ChunkSize : Name<int>
-        {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
deleted file mode 100644
index 668add3..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace Org.Apache.REEF.Tests.Functional.MPI
-{
-    internal class MpiTestConstants
-    {
-        public const string DriverId = "BroadcastReduceDriver";
-        public const string GroupName = "BroadcastReduceGroup";
-        public const string BroadcastOperatorName = "Broadcast";
-        public const string ReduceOperatorName = "Reduce";
-        public const string ScatterOperatorName = "Scatter";
-        public const string MasterTaskId = "MasterTask";
-        public const string SlaveTaskId = "SlaveTask-";
-        public const int NumIterations = 10;
-        public const int FanOut = 2;
-        public const int ChunkSize = 2;
-        public const int ArrayLength = 6;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
deleted file mode 100644
index 98a68dd..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.Linq;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Network.Group.Topology;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
-    public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
-
-        private readonly int _numEvaluators;
-        private readonly int _numIterations;
-        private readonly int _chunkSize;
-
-        private readonly IMpiDriver _mpiDriver;
-        private readonly ICommunicationGroupDriver _commGroup;
-        private readonly TaskStarter _mpiTaskStarter;
-
-        [Inject]
-        public PipelinedBroadcastReduceDriver(
-            [Parameter(typeof (MpiTestConfig.NumEvaluators))] int numEvaluators,
-            [Parameter(typeof (MpiTestConfig.NumIterations))] int numIterations,
-            [Parameter(typeof (MpiTestConfig.ChunkSize))] int chunkSize,
-            MpiDriver mpiDriver)
-        {
-            Logger.Log(Level.Info, "*******entering the driver code " + chunkSize);
-
-            Identifier = "BroadcastStartHandler";
-            _numEvaluators = numEvaluators;
-            _numIterations = numIterations;
-            _chunkSize = chunkSize;
-
-            _mpiDriver = mpiDriver;
-
-            _commGroup = _mpiDriver.DefaultGroup
-                .AddBroadcast<int[], IntArrayCodec>(
-                    MpiTestConstants.BroadcastOperatorName,
-                    MpiTestConstants.MasterTaskId,
-                    TopologyTypes.Tree,
-                    new PipelineIntDataConverter(_chunkSize))
-                .AddReduce<int[], IntArrayCodec>(
-                    MpiTestConstants.ReduceOperatorName,
-                    MpiTestConstants.MasterTaskId,
-                    new ArraySumFunction(),
-                    TopologyTypes.Tree,
-                    new PipelineIntDataConverter(_chunkSize))
-                .Build();
-
-            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
-
-            CreateClassHierarchy();
-        }
-
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
-        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
-        {
-            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
-            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
-            allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
-        }
-
-        public void OnNext(IActiveContext activeContext)
-        {
-            if (_mpiDriver.IsMasterTaskContext(activeContext))
-            {
-                Logger.Log(Level.Info, "******* Master ID " + activeContext.Id );
-
-                // Configure Master Task
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId)
-                        .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class)
-                        .Build())
-                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                        GenericType<MpiTestConfig.NumEvaluators>.Class,
-                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                        GenericType<MpiTestConfig.NumIterations>.Class,
-                        _numIterations.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.ArraySize, int>(
-                        GenericType<MpiTestConfig.ArraySize>.Class,
-                        MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
-                    .Build();
-
-                _commGroup.AddTask(MpiTestConstants.MasterTaskId);
-                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
-            }
-            else
-            {
-                // Configure Slave Task
-                string slaveTaskId = "SlaveTask-" + activeContext.Id;
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, slaveTaskId)
-                        .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class)
-                        .Build())
-                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                        GenericType<MpiTestConfig.NumEvaluators>.Class,
-                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                        GenericType<MpiTestConfig.NumIterations>.Class,
-                        _numIterations.ToString(CultureInfo.InvariantCulture))
-                    .BindNamedParameter<MpiTestConfig.ArraySize, int>(
-                        GenericType<MpiTestConfig.ArraySize>.Class,
-                        MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
-                    .Build();
-
-                _commGroup.AddTask(slaveTaskId);
-                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
-            }
-        }
-
-        public void OnNext(IFailedEvaluator value)
-        {
-        }
-
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnCompleted()
-        {
-        }
-
-        private void CreateClassHierarchy()
-        {
-            HashSet<string> clrDlls = new HashSet<string>();
-            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
-            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
-            clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
-            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
-            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
-            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
-        }
-
-        private class SumFunction : IReduceFunction<int>
-        {
-            [Inject]
-            public SumFunction()
-            {
-            }
-
-            public int Reduce(IEnumerable<int> elements)
-            {
-                return elements.Sum();
-            }
-        }
-
-        private class ArraySumFunction : IReduceFunction<int[]>
-        {
-            [Inject]
-            public ArraySumFunction()
-            {
-            }
-
-            public int[] Reduce(IEnumerable<int[]> elements)
-            {
-                int[] result = null;
-                int count = 0;
-
-                foreach (var element in elements)
-                {
-                    if (count == 0)
-                    {
-                        result = element.Clone() as int[];
-                    }
-                    else
-                    {
-                        if (element.Length != result.Length)
-                        {
-                            throw new Exception("integer arrays are of different sizes");
-                        }
-
-                        for (int i = 0; i < result.Length; i++)
-                        {
-                            result[i] += element[i];
-                        }
-                    }
-
-                    count++;
-                }
-
-                return result;
-            }
-        }
-
-
-        private class IntArrayCodec : ICodec<int[]>
-        {
-            [Inject]
-            public IntArrayCodec()
-            {
-            }
-
-            public byte[] Encode(int[] obj)
-            {
-                byte[] result = new byte[sizeof(Int32) * obj.Length];
-                Buffer.BlockCopy(obj, 0, result, 0, result.Length);
-                return result;
-            }
-
-            public int[] Decode(byte[] data)
-            {
-                if (data.Length % sizeof(Int32) != 0)
-                {
-                    throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size");
-                }
-
-                int[] result = new int[data.Length / sizeof(Int32)];
-                Buffer.BlockCopy(data, 0, result, 0, data.Length);
-                return result;
-            }
-        }
-
-        public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
-        {
-            readonly int _chunkSize;
-            
-            [Inject]
-            public PipelineIntDataConverter([Parameter(typeof(MpiTestConfig.ChunkSize))] int chunkSize)
-            {
-                _chunkSize = chunkSize;
-            }
-
-            public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
-            {
-                List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>();
-                int totalChunks = message.Length / _chunkSize;
-
-                if (message.Length % _chunkSize != 0)
-                {
-                    totalChunks++;
-                }
-
-                int counter = 0;
-                for (int i = 0; i < message.Length; i += _chunkSize)
-                {
-                    int[] data = new int[Math.Min(_chunkSize, message.Length - i)];
-                    Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int));
-
-                    messageList.Add(counter == totalChunks - 1
-                        ? new PipelineMessage<int[]>(data, true)
-                        : new PipelineMessage<int[]>(data, false));
-
-                    counter++;
-                }
-
-                return messageList;
-            }
-
-            public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage)
-            {
-                int size = pipelineMessage.Select(x => x.Data.Length).Sum();
-                int[] data = new int[size];
-                int offset = 0;
-
-                foreach (var message in pipelineMessage)
-                {
-                    Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int));
-                    offset += message.Data.Length * sizeof(int);
-                }
-
-                return data;
-            }
-
-            public IConfiguration GetConfiguration()
-            {
-                return TangFactory.GetTang().NewConfigurationBuilder()
-                .BindNamedParameter<MpiTestConfig.ChunkSize, int>(GenericType<MpiTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture))
-                .Build();
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
deleted file mode 100644
index 05d0b9b..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Globalization;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
-    [TestClass]
-    public class PipelinedBroadcastReduceTest : ReefFunctionalTest
-    {
-        [TestInitialize]
-        public void TestSetup()
-        {
-            CleanUp();
-        }
-
-        [TestCleanup]
-        public void TestCleanup()
-        {
-            CleanUp();
-        }
-
-        [TestMethod]
-        public void TestPipelinedBroadcastAndReduceOnLocalRuntime()
-        {
-            const int numTasks = 9;
-            TestBroadcastAndReduce(false, numTasks);
-            ValidateSuccessForLocalRuntime(numTasks);
-        }
-
-        [Ignore]
-        [TestMethod]
-        public void TestPipelinedBroadcastAndReduceOnYarn()
-        {
-            const int numTasks = 9;
-            TestBroadcastAndReduce(true, numTasks);
-        }
-
-        public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
-        {
-            IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
-                    .Build())
-                .BindNamedParameter<MpiTestConfig.NumIterations, int>(
-                    GenericType<MpiTestConfig.NumIterations>.Class,
-                    MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
-                .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
-                    GenericType<MpiTestConfig.NumEvaluators>.Class,
-                    numTasks.ToString(CultureInfo.InvariantCulture))
-                 .BindNamedParameter<MpiTestConfig.ChunkSize, int>(
-                    GenericType<MpiTestConfig.ChunkSize>.Class,
-                    MpiTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
-                .Build();
-
-            IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId)
-                .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId)
-                .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName)
-                .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
-                .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
-                .Build();
-
-            IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig);
-                    
-            HashSet<string> appDlls = new HashSet<string>();
-            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
-            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
-            appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
-            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
-            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
-
-            TestRun(appDlls, merged, runOnYarn, JavaLoggingSetting.VERBOSE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
deleted file mode 100644
index 922f294..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
-    public class PipelinedMasterTask : ITask
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedMasterTask));
-
-        private readonly int _numIters;
-        private readonly int _numReduceSenders;
-        private readonly int _arraySize;
-
-        private readonly IMpiClient _mpiClient;
-        private readonly ICommunicationGroupClient _commGroup;
-        private readonly IBroadcastSender<int[]> _broadcastSender;
-        private readonly IReduceReceiver<int[]> _sumReducer;
-
-        [Inject]
-        public PipelinedMasterTask(
-            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
-            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
-            [Parameter(typeof(MpiTestConfig.ArraySize))] int arraySize,
-            IMpiClient mpiClient)
-        {
-            Logger.Log(Level.Info, "Hello from master task");
-            _numIters = numIters;
-            _numReduceSenders = numEvaluators - 1;
-            _arraySize = arraySize;
-            _mpiClient = mpiClient;
-
-            _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
-            _broadcastSender = _commGroup.GetBroadcastSender<int[]>(MpiTestConstants.BroadcastOperatorName);
-            _sumReducer = _commGroup.GetReduceReceiver<int[]>(MpiTestConstants.ReduceOperatorName);
-            Logger.Log(Level.Info, "finished master task constructor");
-        }
-
-        public byte[] Call(byte[] memento)
-        {
-            int[] intArr = new int[_arraySize];
-
-            for (int i = 1; i <= _numIters; i++)
-            {
-                for (int j = 0; j < _arraySize; j++)
-                {
-                    intArr[j] = i;
-                }
-
-                _broadcastSender.Send(intArr);
-                int[] sum = _sumReducer.Reduce();
-
-                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
-
-                int expected = TriangleNumber(i) * _numReduceSenders;
-
-                for (int j = 0; j < intArr.Length; j++)
-                {
-                    if (sum[j] != TriangleNumber(i) * _numReduceSenders)
-                    {
-                        throw new Exception("Expected " + expected + " but got " + sum);
-                    }
-                }
-            }
-
-            return null;
-        }
-
-        public void Dispose()
-        {
-            _mpiClient.Dispose();
-        }
-
-        private int TriangleNumber(int n)
-        {
-            return Enumerable.Range(1, n).Sum();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
deleted file mode 100644
index 5455121..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
-{
-    public class PipelinedSlaveTask : ITask
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask));
-
-        private readonly int _numIterations;
-        private readonly IMpiClient _mpiClient;
-        private readonly ICommunicationGroupClient _commGroup;
-        private readonly IBroadcastReceiver<int[]> _broadcastReceiver;
-        private readonly IReduceSender<int[]> _triangleNumberSender;
-
-        [Inject]
-        public PipelinedSlaveTask(
-            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
-            IMpiClient mpiClient)
-        {
-            Logger.Log(Level.Info, "Hello from slave task");
-
-            _numIterations = numIters;
-            _mpiClient = mpiClient;
-            _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
-            _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(MpiTestConstants.BroadcastOperatorName);
-            _triangleNumberSender = _commGroup.GetReduceSender<int[]>(MpiTestConstants.ReduceOperatorName);
-        }
-
-        public byte[] Call(byte[] memento)
-        {
-            for (int i = 0; i < _numIterations; i++)
-            {
-                // Receive n from Master Task
-                int[] intVec = _broadcastReceiver.Receive();
-
-                Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]);
-
-                // Calculate the nth Triangle number and send it back to driver
-                int triangleNum = TriangleNumber(intVec[0]);
-                Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i);
-
-                int[] resArr = new int[intVec.Length];
-
-                for (int j = 0; j < resArr.Length; j++)
-                {
-                    resArr[j] = triangleNum;
-                }
-
-                _triangleNumberSender.Send(resArr);
-            }
-
-            return null;
-        }
-
-        public void Dispose()
-        {
-            _mpiClient.Dispose();
-        }
-
-        private int TriangleNumber(int n)
-        {
-            return Enumerable.Range(1, n).Sum();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
deleted file mode 100644
index e563534..0000000
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Network.Group.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
-{
-    public class MasterTask : ITask
-    {
-        private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask));
-
-        private readonly IMpiClient _mpiClient;
-        private readonly ICommunicationGroupClient _commGroup;
-        private readonly IScatterSender<int> _scatterSender;
-        private readonly IReduceReceiver<int> _sumReducer;
-
-        [Inject]
-        public MasterTask(IMpiClient mpiClient)
-        {
-            _logger.Log(Level.Info, "Hello from master task");
-            _mpiClient = mpiClient;
-
-            _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
-            _scatterSender = _commGroup.GetScatterSender<int>(MpiTestConstants.ScatterOperatorName);
-            _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
-        }
-
-        public byte[] Call(byte[] memento)
-        {
-            List<int> data = Enumerable.Range(1, 100).ToList();
-            _scatterSender.Send(data);
-
-            int sum = _sumReducer.Reduce();
-            _logger.Log(Level.Info, "Received sum: {0}", sum);
-
-            return null;
-        }
-
-        public void Dispose()
-        {
-            _mpiClient.Dispose();
-        }
-
-        private List<string> GetScatterOrder()
-        {
-            return new List<string> { "SlaveTask-4", "SlaveTask-3", "SlaveTask-2", "SlaveTask-1" };
-        }
-    }
-}