You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/06/25 23:33:42 UTC

incubator-reef git commit: [REEF-406]: Give user an option to configure the TCPPortProvider in BroadcastAndReduceClient and PipelineBroadcastAndReduceClient

Repository: incubator-reef
Updated Branches:
  refs/heads/master b459aafd8 -> f68ce6949


[REEF-406]: Give user an option to configure the TCPPortProvider in BroadcastAndReduceClient and PipelineBroadcastAndReduceClient

This addressed the issue by
	* Allowing user to specify the starting port number and range via
	  command line.
	* Binding this information in the driver configuration.

JIRA:
  [REEF-406](https://issues.apache.org/jira/browse/REEF-406)

Pull Request:
  This closes #243


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/f68ce694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/f68ce694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/f68ce694

Branch: refs/heads/master
Commit: f68ce6949f10ae90af4df25f59b626843581233b
Parents: b459aaf
Author: Dhruv <dh...@gmail.com>
Authored: Thu Jun 25 00:28:51 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jun 25 14:33:07 2015 -0700

----------------------------------------------------------------------
 .../BroadcastAndReduceClient.cs                 |   8 +-
 .../PipelineBroadcastAndReduceClient.cs         |  21 ++-
 .../Run.cs                                      |  45 ++++++-
 .../BroadcastReduceDriver.cs                    |  12 ++
 .../GroupCommunication/GroupTestConfig.cs       |  10 ++
 .../PipelinedBroadcastReduceDriver.cs           | 128 ++++++++++---------
 .../PipelinedMasterTask.cs                      |   2 +-
 7 files changed, 153 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
index 559d936..49a9b45 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Network.Examples.Client
 {
     class BroadcastAndReduceClient
     {
-        public void RunBroadcastAndReduce(bool runOnYarn, int numTasks)
+        public void RunBroadcastAndReduce(bool runOnYarn, int numTasks, int startingPortNo, int portRange)
         {
             const int numIterations = 10;
             const string driverId = "BroadcastReduceDriver";
@@ -60,6 +60,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
                 .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
                     GenericType<GroupTestConfig.NumEvaluators>.Class,
                     numTasks.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.StartingPort, int>(
+                    GenericType<GroupTestConfig.StartingPort>.Class,
+                    startingPortNo.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.PortRange, int>(
+                    GenericType<GroupTestConfig.PortRange>.Class,
+                    portRange.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
             IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
index c86697b..16aa698 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
@@ -41,13 +41,15 @@ namespace Org.Apache.REEF.Network.Examples.Client
 {
     public class PipelineBroadcastAndReduceClient
     {
-        public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks)
+        public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks, int startingPortNo, int portRange, int arraySize, int chunkSize)
         {
             IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
                 DriverBridgeConfiguration.ConfigurationModule
                     .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
+                        GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
+                        GenericType<PipelinedBroadcastReduceDriver>.Class)
                     .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
                     .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
                     .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
@@ -58,9 +60,18 @@ namespace Org.Apache.REEF.Network.Examples.Client
                 .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
                     GenericType<GroupTestConfig.NumEvaluators>.Class,
                     numTasks.ToString(CultureInfo.InvariantCulture))
-                 .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+                .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
                     GenericType<GroupTestConfig.ChunkSize>.Class,
-                    GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                    chunkSize.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.StartingPort, int>(
+                    GenericType<GroupTestConfig.StartingPort>.Class,
+                    startingPortNo.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.PortRange, int>(
+                    GenericType<GroupTestConfig.PortRange>.Class,
+                    portRange.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.ArraySize, int>(
+                    GenericType<GroupTestConfig.ArraySize>.Class,
+                    arraySize.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
             IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
index 899e548..e4d10da 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
@@ -19,6 +19,8 @@
 
 using System;
 using System.Collections.Generic;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Network.Examples.Client
 {
@@ -28,7 +30,11 @@ namespace Org.Apache.REEF.Network.Examples.Client
         {
             Console.WriteLine("start running client: " + DateTime.Now);
             bool runOnYarn = false;
-            List<string> testToRun = new List<string>();
+            int numNodes = 9;
+            int startPort = 8900;
+            int portRange = 1000;
+            string testToRun = "RunBroadcastAndReduce";
+            
             if (args != null)
             {
                 if (args.Length > 0)
@@ -36,21 +42,46 @@ namespace Org.Apache.REEF.Network.Examples.Client
                     runOnYarn = bool.Parse(args[0].ToLower());
                 }
 
-                for (int i = 1; i < args.Length; i++)
+                if (args.Length > 1)
                 {
-                    testToRun.Add(args[i].ToLower());
+                    numNodes = int.Parse(args[1]);
+                }
+
+                if (args.Length > 2)
+                {
+                    startPort = int.Parse(args[2]);
+                }
+
+                if (args.Length > 3)
+                {
+                    portRange = int.Parse(args[3]);
+                }
+
+                if (args.Length > 4)
+                {
+                    testToRun = args[4].ToLower();
                 }
             }
 
-            if (testToRun.Contains("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0)
+            if (testToRun.Equals("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
             {
-                new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, 9);
+                int arraySize = GroupTestConstants.ArrayLength;
+                int chunkSize = GroupTestConstants.ChunkSize;
+
+                if (args.Length > 5)
+                {
+                    arraySize = int.Parse(args[5]);
+                    chunkSize = int.Parse(args[6]);
+                }
+
+                new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, numNodes, startPort,
+                    portRange, arraySize, chunkSize);
                 Console.WriteLine("RunPipelineBroadcastAndReduce completed!!!");
             }
 
-            if (testToRun.Contains("RunBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0)
+            if (testToRun.Equals("RunBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
             {
-                new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, 9);
+                new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, numNodes, startPort, portRange);
                 Console.WriteLine("RunBroadcastAndReduce completed!!!");
             }           
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index fd27557..1d91e63 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -42,6 +42,7 @@ using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
 {
@@ -55,11 +56,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         private readonly IGroupCommDriver _groupCommDriver;
         private readonly ICommunicationGroupDriver _commGroup;
         private readonly TaskStarter _groupCommTaskStarter;
+        private readonly IConfiguration _tcpPortProviderConfig;
 
         [Inject]
         public BroadcastReduceDriver(
             [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators,
             [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations,
+            [Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
+            [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
             GroupCommDriver groupCommDriver)
         {
             Identifier = "BroadcastStartHandler";
@@ -67,6 +71,13 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             _numIterations = numIterations;
             _groupCommDriver = groupCommDriver;
 
+            _tcpPortProviderConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<TcpPortRangeStart, int>(GenericType<TcpPortRangeStart>.Class,
+                    startingPort.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<TcpPortRangeCount, int>(GenericType<TcpPortRangeCount>.Class,
+                    portRange.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
             IConfiguration codecConfig = CodecConfiguration<int>.Conf
                 .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
                 .Build();
@@ -112,6 +123,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         {
             IConfiguration contextConf = _groupCommDriver.GetContextConfiguration();
             IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration();
+            serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig);
             allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
index c7d93c1..e329788 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
@@ -52,5 +52,15 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication
         public class ChunkSize : Name<int>
         {
         }
+
+        [NamedParameter(Documentation = "Starting port for TcpPortProvider", DefaultValue = "8900")]
+        public class StartingPort : Name<int>
+        {
+        }
+
+        [NamedParameter(Documentation = "Port Range count for TcpPortProvider", DefaultValue = "1000")]
+        public class PortRange : Name<int>
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 6d06787..4cf8486 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -42,27 +41,30 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
 {
-    public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>,
+        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
-
-        private readonly int _numEvaluators;
-        private readonly int _numIterations;
-        private readonly int _chunkSize;
-
-        private readonly IGroupCommDriver _groupCommDriver;
+        private readonly int _arraySize;
         private readonly ICommunicationGroupDriver _commGroup;
+        private readonly IGroupCommDriver _groupCommDriver;
         private readonly TaskStarter _groupCommTaskStarter;
+        private readonly int _numEvaluators;
+        private readonly int _numIterations;
+        private readonly IConfiguration _tcpPortProviderConfig;
 
         [Inject]
         public PipelinedBroadcastReduceDriver(
-            [Parameter(typeof (GroupTestConfig.NumEvaluators))] int numEvaluators,
+            [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators,
             [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations,
+            [Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
+            [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
             [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
+            [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
             GroupCommDriver groupCommDriver)
         {
             Logger.Log(Level.Info, "entering the driver code " + chunkSize);
@@ -70,24 +72,31 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
             _numIterations = numIterations;
-            _chunkSize = chunkSize;
+            _arraySize = arraySize;
+
+            _tcpPortProviderConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<TcpPortRangeStart, int>(GenericType<TcpPortRangeStart>.Class,
+                    startingPort.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<TcpPortRangeCount, int>(GenericType<TcpPortRangeCount>.Class,
+                    portRange.ToString(CultureInfo.InvariantCulture))
+                .Build();
 
-            IConfiguration codecConfig = CodecConfiguration<int[]>.Conf
+            var codecConfig = CodecConfiguration<int[]>.Conf
                 .Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class)
                 .Build();
 
-            IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
+            var reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
                 .Set(ReduceFunctionConfiguration<int[]>.ReduceFunction, GenericType<ArraySumFunction>.Class)
                 .Build();
 
-            IConfiguration dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
+            var dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
                 PipelineDataConverterConfiguration<int[]>.Conf
                     .Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
                         GenericType<PipelineIntDataConverter>.Class)
                     .Build())
                 .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
                     GenericType<GroupTestConfig.ChunkSize>.Class,
-                    GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                    chunkSize.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
             _groupCommDriver = groupCommDriver;
@@ -113,29 +122,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             CreateClassHierarchy();
         }
 
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
-        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
-        {
-            IConfiguration contextConf = _groupCommDriver.GetContextConfiguration();
-            IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration();
-            allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
-        }
-
         public void OnNext(IActiveContext activeContext)
         {
             if (_groupCommDriver.IsMasterTaskContext(activeContext))
             {
-                Logger.Log(Level.Info, "******* Master ID " + activeContext.Id );
+                Logger.Log(Level.Info, "******* Master ID " + activeContext.Id);
 
                 // Configure Master Task
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
+                var partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
                     TaskConfiguration.ConfigurationModule
                         .Set(TaskConfiguration.Identifier, GroupTestConstants.MasterTaskId)
                         .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class)
@@ -148,7 +142,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                         _numIterations.ToString(CultureInfo.InvariantCulture))
                     .BindNamedParameter<GroupTestConfig.ArraySize, int>(
                         GenericType<GroupTestConfig.ArraySize>.Class,
-                        GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                        _arraySize.ToString(CultureInfo.InvariantCulture))
                     .Build();
 
                 _commGroup.AddTask(GroupTestConstants.MasterTaskId);
@@ -157,8 +151,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             else
             {
                 // Configure Slave Task
-                string slaveTaskId = "SlaveTask-" + activeContext.Id;
-                IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
+                var slaveTaskId = "SlaveTask-" + activeContext.Id;
+                var partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
                     TaskConfiguration.ConfigurationModule
                         .Set(TaskConfiguration.Identifier, slaveTaskId)
                         .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class)
@@ -171,7 +165,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                         _numIterations.ToString(CultureInfo.InvariantCulture))
                     .BindNamedParameter<GroupTestConfig.ArraySize, int>(
                         GenericType<GroupTestConfig.ArraySize>.Class,
-                        GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                        _arraySize.ToString(CultureInfo.InvariantCulture))
                     .Build();
 
                 _commGroup.AddTask(slaveTaskId);
@@ -179,8 +173,18 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             }
         }
 
-        public void OnNext(IFailedEvaluator value)
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
+            var contextConf = _groupCommDriver.GetContextConfiguration();
+            var serviceConf = _groupCommDriver.GetServiceConfiguration();
+            serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig);
+            allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
+        }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            var request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
         }
 
         public void OnError(Exception error)
@@ -191,9 +195,15 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         {
         }
 
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public string Identifier { get; set; }
+
         private void CreateClassHierarchy()
         {
-            HashSet<string> clrDlls = new HashSet<string>();
+            var clrDlls = new HashSet<string>();
             clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
             clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
@@ -226,7 +236,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             public int[] Reduce(IEnumerable<int[]> elements)
             {
                 int[] result = null;
-                int count = 0;
+                var count = 0;
 
                 foreach (var element in elements)
                 {
@@ -241,7 +251,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                             throw new Exception("integer arrays are of different sizes");
                         }
 
-                        for (int i = 0; i < result.Length; i++)
+                        for (var i = 0; i < result.Length; i++)
                         {
                             result[i] += element[i];
                         }
@@ -254,7 +264,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             }
         }
 
-
         private class IntArrayCodec : ICodec<int[]>
         {
             [Inject]
@@ -264,19 +273,20 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
             public byte[] Encode(int[] obj)
             {
-                byte[] result = new byte[sizeof(Int32) * obj.Length];
+                var result = new byte[sizeof (Int32)*obj.Length];
                 Buffer.BlockCopy(obj, 0, result, 0, result.Length);
                 return result;
             }
 
             public int[] Decode(byte[] data)
             {
-                if (data.Length % sizeof(Int32) != 0)
+                if (data.Length%sizeof (Int32) != 0)
                 {
-                    throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size");
+                    throw new Exception(
+                        "error inside integer array decoder, byte array length not a multiple of interger size");
                 }
 
-                int[] result = new int[data.Length / sizeof(Int32)];
+                var result = new int[data.Length/sizeof (Int32)];
                 Buffer.BlockCopy(data, 0, result, 0, data.Length);
                 return result;
             }
@@ -284,8 +294,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
         public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
         {
-            readonly int _chunkSize;
-            
+            private readonly int _chunkSize;
+
             [Inject]
             public PipelineIntDataConverter([Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize)
             {
@@ -294,19 +304,19 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
             public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
             {
-                List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>();
-                int totalChunks = message.Length / _chunkSize;
+                var messageList = new List<PipelineMessage<int[]>>();
+                var totalChunks = message.Length/_chunkSize;
 
-                if (message.Length % _chunkSize != 0)
+                if (message.Length%_chunkSize != 0)
                 {
                     totalChunks++;
                 }
 
-                int counter = 0;
-                for (int i = 0; i < message.Length; i += _chunkSize)
+                var counter = 0;
+                for (var i = 0; i < message.Length; i += _chunkSize)
                 {
-                    int[] data = new int[Math.Min(_chunkSize, message.Length - i)];
-                    Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int));
+                    var data = new int[Math.Min(_chunkSize, message.Length - i)];
+                    Buffer.BlockCopy(message, i*sizeof (int), data, 0, data.Length*sizeof (int));
 
                     messageList.Add(counter == totalChunks - 1
                         ? new PipelineMessage<int[]>(data, true)
@@ -320,14 +330,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
             public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage)
             {
-                int size = pipelineMessage.Select(x => x.Data.Length).Sum();
-                int[] data = new int[size];
-                int offset = 0;
+                var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+                var data = new int[size];
+                var offset = 0;
 
                 foreach (var message in pipelineMessage)
                 {
-                    Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int));
-                    offset += message.Data.Length * sizeof(int);
+                    Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length*sizeof (int));
+                    offset += message.Data.Length*sizeof (int);
                 }
 
                 return data;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
index 416fe41..4656fd9 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
@@ -73,7 +73,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
                 _broadcastSender.Send(intArr);
                 int[] sum = _sumReducer.Reduce();
 
-                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
+                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} with array length: {2}", sum[0], i, sum.Length);
 
                 int expected = TriangleNumber(i) * _numReduceSenders;