You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/06/25 23:33:42 UTC
incubator-reef git commit: [REEF-406]: Give user an option to
configure the TCPPortProvider in BroadcastAndReduceClient and
PipelineBroadcastAndReduceClient
Repository: incubator-reef
Updated Branches:
refs/heads/master b459aafd8 -> f68ce6949
[REEF-406]: Give user an option to configure the TCPPortProvider in BroadcastAndReduceClient and PipelineBroadcastAndReduceClient
This addressed the issue by
* Allowing user to specify the starting port number and range via
command line.
* Binding this information in the driver configuration.
JIRA:
[REEF-406](https://issues.apache.org/jira/browse/REEF-406)
Pull Request:
This closes #243
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/f68ce694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/f68ce694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/f68ce694
Branch: refs/heads/master
Commit: f68ce6949f10ae90af4df25f59b626843581233b
Parents: b459aaf
Author: Dhruv <dh...@gmail.com>
Authored: Thu Jun 25 00:28:51 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jun 25 14:33:07 2015 -0700
----------------------------------------------------------------------
.../BroadcastAndReduceClient.cs | 8 +-
.../PipelineBroadcastAndReduceClient.cs | 21 ++-
.../Run.cs | 45 ++++++-
.../BroadcastReduceDriver.cs | 12 ++
.../GroupCommunication/GroupTestConfig.cs | 10 ++
.../PipelinedBroadcastReduceDriver.cs | 128 ++++++++++---------
.../PipelinedMasterTask.cs | 2 +-
7 files changed, 153 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
index 559d936..49a9b45 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Network.Examples.Client
{
class BroadcastAndReduceClient
{
- public void RunBroadcastAndReduce(bool runOnYarn, int numTasks)
+ public void RunBroadcastAndReduce(bool runOnYarn, int numTasks, int startingPortNo, int portRange)
{
const int numIterations = 10;
const string driverId = "BroadcastReduceDriver";
@@ -60,6 +60,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
.BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
GenericType<GroupTestConfig.NumEvaluators>.Class,
numTasks.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.StartingPort, int>(
+ GenericType<GroupTestConfig.StartingPort>.Class,
+ startingPortNo.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.PortRange, int>(
+ GenericType<GroupTestConfig.PortRange>.Class,
+ portRange.ToString(CultureInfo.InvariantCulture))
.Build();
IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
index c86697b..16aa698 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
@@ -41,13 +41,15 @@ namespace Org.Apache.REEF.Network.Examples.Client
{
public class PipelineBroadcastAndReduceClient
{
- public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks)
+ public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks, int startingPortNo, int portRange, int arraySize, int chunkSize)
{
IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder(
DriverBridgeConfiguration.ConfigurationModule
.Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
+ GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
+ GenericType<PipelinedBroadcastReduceDriver>.Class)
.Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class)
.Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class)
.Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
@@ -58,9 +60,18 @@ namespace Org.Apache.REEF.Network.Examples.Client
.BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
GenericType<GroupTestConfig.NumEvaluators>.Class,
numTasks.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+ .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
GenericType<GroupTestConfig.ChunkSize>.Class,
- GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+ chunkSize.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.StartingPort, int>(
+ GenericType<GroupTestConfig.StartingPort>.Class,
+ startingPortNo.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.PortRange, int>(
+ GenericType<GroupTestConfig.PortRange>.Class,
+ portRange.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<GroupTestConfig.ArraySize, int>(
+ GenericType<GroupTestConfig.ArraySize>.Class,
+ arraySize.ToString(CultureInfo.InvariantCulture))
.Build();
IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder()
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
index 899e548..e4d10da 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
@@ -19,6 +19,8 @@
using System;
using System.Collections.Generic;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Network.Examples.Client
{
@@ -28,7 +30,11 @@ namespace Org.Apache.REEF.Network.Examples.Client
{
Console.WriteLine("start running client: " + DateTime.Now);
bool runOnYarn = false;
- List<string> testToRun = new List<string>();
+ int numNodes = 9;
+ int startPort = 8900;
+ int portRange = 1000;
+ string testToRun = "RunBroadcastAndReduce";
+
if (args != null)
{
if (args.Length > 0)
@@ -36,21 +42,46 @@ namespace Org.Apache.REEF.Network.Examples.Client
runOnYarn = bool.Parse(args[0].ToLower());
}
- for (int i = 1; i < args.Length; i++)
+ if (args.Length > 1)
{
- testToRun.Add(args[i].ToLower());
+ numNodes = int.Parse(args[1]);
+ }
+
+ if (args.Length > 2)
+ {
+ startPort = int.Parse(args[2]);
+ }
+
+ if (args.Length > 3)
+ {
+ portRange = int.Parse(args[3]);
+ }
+
+ if (args.Length > 4)
+ {
+ testToRun = args[4].ToLower();
}
}
- if (testToRun.Contains("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0)
+ if (testToRun.Equals("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
{
- new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, 9);
+ int arraySize = GroupTestConstants.ArrayLength;
+ int chunkSize = GroupTestConstants.ChunkSize;
+
+ if (args.Length > 5)
+ {
+ arraySize = int.Parse(args[5]);
+ chunkSize = int.Parse(args[6]);
+ }
+
+ new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, numNodes, startPort,
+ portRange, arraySize, chunkSize);
Console.WriteLine("RunPipelineBroadcastAndReduce completed!!!");
}
- if (testToRun.Contains("RunBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0)
+ if (testToRun.Equals("RunBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
{
- new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, 9);
+ new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, numNodes, startPort, portRange);
Console.WriteLine("RunBroadcastAndReduce completed!!!");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index fd27557..1d91e63 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -42,6 +42,7 @@ using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
{
@@ -55,11 +56,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
private readonly IGroupCommDriver _groupCommDriver;
private readonly ICommunicationGroupDriver _commGroup;
private readonly TaskStarter _groupCommTaskStarter;
+ private readonly IConfiguration _tcpPortProviderConfig;
[Inject]
public BroadcastReduceDriver(
[Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators,
[Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations,
+ [Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
+ [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
GroupCommDriver groupCommDriver)
{
Identifier = "BroadcastStartHandler";
@@ -67,6 +71,13 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
_numIterations = numIterations;
_groupCommDriver = groupCommDriver;
+ _tcpPortProviderConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<TcpPortRangeStart, int>(GenericType<TcpPortRangeStart>.Class,
+ startingPort.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<TcpPortRangeCount, int>(GenericType<TcpPortRangeCount>.Class,
+ portRange.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
IConfiguration codecConfig = CodecConfiguration<int>.Conf
.Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class)
.Build();
@@ -112,6 +123,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
{
IConfiguration contextConf = _groupCommDriver.GetContextConfiguration();
IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration();
+ serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig);
allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
index c7d93c1..e329788 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
@@ -52,5 +52,15 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication
public class ChunkSize : Name<int>
{
}
+
+ [NamedParameter(Documentation = "Starting port for TcpPortProvider", DefaultValue = "8900")]
+ public class StartingPort : Name<int>
+ {
+ }
+
+ [NamedParameter(Documentation = "Port Range count for TcpPortProvider", DefaultValue = "1000")]
+ public class PortRange : Name<int>
+ {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 6d06787..4cf8486 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -42,27 +41,30 @@ using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
{
- public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+ public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>,
+ IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
-
- private readonly int _numEvaluators;
- private readonly int _numIterations;
- private readonly int _chunkSize;
-
- private readonly IGroupCommDriver _groupCommDriver;
+ private readonly int _arraySize;
private readonly ICommunicationGroupDriver _commGroup;
+ private readonly IGroupCommDriver _groupCommDriver;
private readonly TaskStarter _groupCommTaskStarter;
+ private readonly int _numEvaluators;
+ private readonly int _numIterations;
+ private readonly IConfiguration _tcpPortProviderConfig;
[Inject]
public PipelinedBroadcastReduceDriver(
- [Parameter(typeof (GroupTestConfig.NumEvaluators))] int numEvaluators,
+ [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators,
[Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations,
+ [Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
+ [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
[Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
+ [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
GroupCommDriver groupCommDriver)
{
Logger.Log(Level.Info, "entering the driver code " + chunkSize);
@@ -70,24 +72,31 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
Identifier = "BroadcastStartHandler";
_numEvaluators = numEvaluators;
_numIterations = numIterations;
- _chunkSize = chunkSize;
+ _arraySize = arraySize;
+
+ _tcpPortProviderConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<TcpPortRangeStart, int>(GenericType<TcpPortRangeStart>.Class,
+ startingPort.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<TcpPortRangeCount, int>(GenericType<TcpPortRangeCount>.Class,
+ portRange.ToString(CultureInfo.InvariantCulture))
+ .Build();
- IConfiguration codecConfig = CodecConfiguration<int[]>.Conf
+ var codecConfig = CodecConfiguration<int[]>.Conf
.Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class)
.Build();
- IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
+ var reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf
.Set(ReduceFunctionConfiguration<int[]>.ReduceFunction, GenericType<ArraySumFunction>.Class)
.Build();
- IConfiguration dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
+ var dataConverterConfig = TangFactory.GetTang().NewConfigurationBuilder(
PipelineDataConverterConfiguration<int[]>.Conf
.Set(PipelineDataConverterConfiguration<int[]>.DataConverter,
GenericType<PipelineIntDataConverter>.Class)
.Build())
.BindNamedParameter<GroupTestConfig.ChunkSize, int>(
GenericType<GroupTestConfig.ChunkSize>.Class,
- GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+ chunkSize.ToString(CultureInfo.InvariantCulture))
.Build();
_groupCommDriver = groupCommDriver;
@@ -113,29 +122,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
CreateClassHierarchy();
}
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
-
- public void OnNext(IAllocatedEvaluator allocatedEvaluator)
- {
- IConfiguration contextConf = _groupCommDriver.GetContextConfiguration();
- IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration();
- allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
- }
-
public void OnNext(IActiveContext activeContext)
{
if (_groupCommDriver.IsMasterTaskContext(activeContext))
{
- Logger.Log(Level.Info, "******* Master ID " + activeContext.Id );
+ Logger.Log(Level.Info, "******* Master ID " + activeContext.Id);
// Configure Master Task
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
+ var partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, GroupTestConstants.MasterTaskId)
.Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class)
@@ -148,7 +142,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
_numIterations.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter<GroupTestConfig.ArraySize, int>(
GenericType<GroupTestConfig.ArraySize>.Class,
- GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+ _arraySize.ToString(CultureInfo.InvariantCulture))
.Build();
_commGroup.AddTask(GroupTestConstants.MasterTaskId);
@@ -157,8 +151,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
else
{
// Configure Slave Task
- string slaveTaskId = "SlaveTask-" + activeContext.Id;
- IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
+ var slaveTaskId = "SlaveTask-" + activeContext.Id;
+ var partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(
TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, slaveTaskId)
.Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class)
@@ -171,7 +165,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
_numIterations.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter<GroupTestConfig.ArraySize, int>(
GenericType<GroupTestConfig.ArraySize>.Class,
- GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+ _arraySize.ToString(CultureInfo.InvariantCulture))
.Build();
_commGroup.AddTask(slaveTaskId);
@@ -179,8 +173,18 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
}
}
- public void OnNext(IFailedEvaluator value)
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
+ var contextConf = _groupCommDriver.GetContextConfiguration();
+ var serviceConf = _groupCommDriver.GetServiceConfiguration();
+ serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig);
+ allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf);
+ }
+
+ public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+ {
+ var request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator");
+ evaluatorRequestor.Submit(request);
}
public void OnError(Exception error)
@@ -191,9 +195,15 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
{
}
+ public void OnNext(IFailedEvaluator value)
+ {
+ }
+
+ public string Identifier { get; set; }
+
private void CreateClassHierarchy()
{
- HashSet<string> clrDlls = new HashSet<string>();
+ var clrDlls = new HashSet<string>();
clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
@@ -226,7 +236,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public int[] Reduce(IEnumerable<int[]> elements)
{
int[] result = null;
- int count = 0;
+ var count = 0;
foreach (var element in elements)
{
@@ -241,7 +251,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
throw new Exception("integer arrays are of different sizes");
}
- for (int i = 0; i < result.Length; i++)
+ for (var i = 0; i < result.Length; i++)
{
result[i] += element[i];
}
@@ -254,7 +264,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
}
}
-
private class IntArrayCodec : ICodec<int[]>
{
[Inject]
@@ -264,19 +273,20 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public byte[] Encode(int[] obj)
{
- byte[] result = new byte[sizeof(Int32) * obj.Length];
+ var result = new byte[sizeof (Int32)*obj.Length];
Buffer.BlockCopy(obj, 0, result, 0, result.Length);
return result;
}
public int[] Decode(byte[] data)
{
- if (data.Length % sizeof(Int32) != 0)
+ if (data.Length%sizeof (Int32) != 0)
{
- throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size");
+ throw new Exception(
+ "error inside integer array decoder, byte array length not a multiple of interger size");
}
- int[] result = new int[data.Length / sizeof(Int32)];
+ var result = new int[data.Length/sizeof (Int32)];
Buffer.BlockCopy(data, 0, result, 0, data.Length);
return result;
}
@@ -284,8 +294,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
{
- readonly int _chunkSize;
-
+ private readonly int _chunkSize;
+
[Inject]
public PipelineIntDataConverter([Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize)
{
@@ -294,19 +304,19 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
{
- List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>();
- int totalChunks = message.Length / _chunkSize;
+ var messageList = new List<PipelineMessage<int[]>>();
+ var totalChunks = message.Length/_chunkSize;
- if (message.Length % _chunkSize != 0)
+ if (message.Length%_chunkSize != 0)
{
totalChunks++;
}
- int counter = 0;
- for (int i = 0; i < message.Length; i += _chunkSize)
+ var counter = 0;
+ for (var i = 0; i < message.Length; i += _chunkSize)
{
- int[] data = new int[Math.Min(_chunkSize, message.Length - i)];
- Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int));
+ var data = new int[Math.Min(_chunkSize, message.Length - i)];
+ Buffer.BlockCopy(message, i*sizeof (int), data, 0, data.Length*sizeof (int));
messageList.Add(counter == totalChunks - 1
? new PipelineMessage<int[]>(data, true)
@@ -320,14 +330,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage)
{
- int size = pipelineMessage.Select(x => x.Data.Length).Sum();
- int[] data = new int[size];
- int offset = 0;
+ var size = pipelineMessage.Select(x => x.Data.Length).Sum();
+ var data = new int[size];
+ var offset = 0;
foreach (var message in pipelineMessage)
{
- Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int));
- offset += message.Data.Length * sizeof(int);
+ Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length*sizeof (int));
+ offset += message.Data.Length*sizeof (int);
}
return data;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f68ce694/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
index 416fe41..4656fd9 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
@@ -73,7 +73,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
_broadcastSender.Send(intArr);
int[] sum = _sumReducer.Reduce();
- Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i);
+ Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} with array length: {2}", sum[0], i, sum.Length);
int expected = TriangleNumber(i) * _numReduceSenders;