You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/07/15 21:13:09 UTC
reef git commit: [REEF-1183] Enable KMeans .NET tests
Repository: reef
Updated Branches:
refs/heads/master d0b675238 -> 83e5e8667
[REEF-1183] Enable KMeans .NET tests
This addressed the issue by
* removing dependency on an obsolete file
* adding code for on-the-fly input file generaton
* fixing KMeans tests to generate an input file before running and take it as input
* fix bug by removing default value for DataPartitionCache.PartitionIndex
* fix bug of requesting only one evaluator
* fix bug of miscalculating partial means in algorithm
* fix bug of using CodecConfiguration instead of CodecToStreamingCodecConfiguration
JIRA:
[REEF-1183](https://issues.apache.org/jira/browse/REEF-1183)
Pull Request:
This closes #1053
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/83e5e866
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/83e5e866
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/83e5e866
Branch: refs/heads/master
Commit: 83e5e8667320b133ca75e1763627f18af28307b7
Parents: d0b6752
Author: Jason (Joo Seong) Jeong <cu...@gmail.com>
Authored: Wed Jun 22 18:14:52 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Fri Jul 15 14:11:43 2016 -0700
----------------------------------------------------------------------
.../KMeans/DataPartitionCache.cs | 2 +-
.../KMeans/KMeansDriverHandlers.cs | 12 ++--
.../MachineLearning/KMeans/KMeansMasterTask.cs | 3 +-
.../MachineLearning/KMeans/PartialMean.cs | 11 ++-
.../Functional/ML/KMeans/TestKMeans.cs | 72 ++++++++++++++------
5 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
index 4c60a0f..460cd0b 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
return ReadDataFile(file);
}
- [NamedParameter("Data partition index", "partition", "")]
+ [NamedParameter("Data partition index", "partition")]
public class PartitionIndex : Name<int>
{
}
http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index e0e75f8..a266a62 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -73,9 +73,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
CommandLineArguments arguments)
{
_executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4));
- string dataFile = arguments.Arguments.Single(a => a.StartsWith("DataFile", StringComparison.Ordinal)).Split(':')[1];
+ string dataFile = arguments.Arguments.First();
DataVector.ShuffleDataAndGetInitialCentriods(
- Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", dataFile),
+ dataFile,
numPartitions,
_clustersNumber,
_executionDirectory);
@@ -86,7 +86,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
_evaluatorRequestor = evaluatorRequestor;
_centroidCodecConf = CodecToStreamingCodecConfiguration<Centroids>.Conf
- .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
+ .Set(CodecToStreamingCodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
.Build();
IConfiguration dataConverterConfig1 = PipelineDataConverterConfiguration<Centroids>.Conf
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
.Build();
_controlMessageCodecConf = CodecToStreamingCodecConfiguration<ControlMessage>.Conf
- .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
+ .Set(CodecToStreamingCodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
.Build();
IConfiguration dataConverterConfig2 = PipelineDataConverterConfiguration<ControlMessage>.Conf
@@ -102,7 +102,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
.Build();
_processedResultsCodecConf = CodecToStreamingCodecConfiguration<ProcessedResults>.Conf
- .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
+ .Set(CodecToStreamingCodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
.Build();
IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf
@@ -187,7 +187,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
public void OnNext(IDriverStarted value)
{
- var request = _evaluatorRequestor.NewBuilder().SetCores(1).SetMegabytes(2048).Build();
+ var request = _evaluatorRequestor.NewBuilder().SetNumber(_totalEvaluators).SetCores(1).SetMegabytes(2048).Build();
_evaluatorRequestor.Submit(request);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
index c1b3e90..140897b 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
@@ -140,7 +140,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
for (int i = 0; i < clustersNum; i++)
{
List<PartialMean> means = totalList.Where(m => m.Mean.Label == i).ToList();
- aggregatedMeans.Add(new PartialMean(PartialMean.AggreatedMean(means), means.Count));
+ PartialMean aggregatedPartialMean = PartialMean.AggregatedPartialMean(means);
+ aggregatedMeans.Add(new PartialMean(aggregatedPartialMean.Mean, aggregatedPartialMean.Size));
}
ProcessedResults returnMeans = new ProcessedResults(aggregatedMeans, aggregatedLoss);
http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
index afeda32..f923a61 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
@@ -53,7 +53,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
return new PartialMean(DataVector.FromString(parts[0]), int.Parse(parts[1], CultureInfo.InvariantCulture));
}
- public static DataVector AggreatedMean(List<PartialMean> means)
+ public static PartialMean AggregatedPartialMean(List<PartialMean> means)
{
if (means == null || means.Count == 0)
{
@@ -64,7 +64,12 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
{
mean = mean.CombinePartialMean(means[i]);
}
- return mean.Mean;
+ return mean;
+ }
+
+ public static DataVector AggregatedMean(List<PartialMean> means)
+ {
+ return AggregatedPartialMean(means).Mean;
}
public static List<DataVector> AggregateTrueMeansToFileSystem(int partitionsNum, int clustersNum, string executionDirectory)
@@ -93,7 +98,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
for (int i = 0; i < clustersNum; i++)
{
List<PartialMean> means = partialMeans.Where(m => m.Mean.Label == i).ToList();
- newCentroids.Add(PartialMean.AggreatedMean(means));
+ newCentroids.Add(PartialMean.AggregatedMean(means));
}
return newCentroids;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 0e42ef6..0afda40 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -40,24 +40,15 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
{
private const int K = 3;
private const int Partitions = 2;
- private const string SmallMouseDataFile = @"mouseData_small.csv";
- private const string MouseDataFile = @"mouseData.csv";
-
- private readonly bool _useSmallDataSet = false;
- private string _dataFile = MouseDataFile;
+ private const string DataFileNamePrefix = "KMeansInput-";
public TestKMeans()
{
- if (_useSmallDataSet)
- {
- _dataFile = SmallMouseDataFile;
- }
-
CleanUp();
Init();
}
- [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+ [Fact]
[Trait("Priority", "1")]
[Trait("Category", "FunctionalGated")]
[Trait("Description", "Test KMeans clustering with things directly run without reef")]
@@ -65,8 +56,10 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
public void TestKMeansOnDirectRunViaFileSystem()
{
int iteration = 0;
- string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4));
- List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, executionDirectory);
+ string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(),
+ string.Join("-", Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)));
+ string dataFilePath = GenerateDataFileAndGetPath();
+ List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(dataFilePath, Partitions, K, executionDirectory);
// initialize all tasks
List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>();
@@ -105,9 +98,19 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
}
iteration++;
}
+
+ // cleanup workspace
+ try
+ {
+ Directory.Delete(executionDirectory, true);
+ }
+ catch (Exception)
+ {
+ // do not fail if clean up is unsuccessful
+ }
}
- [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+ [Fact]
[Trait("Priority", "1")]
[Trait("Category", "FunctionalGated")]
[Trait("Description", "Test KMeans clustering on reef local runtime with group communications")]
@@ -116,12 +119,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
{
IsOnLocalRuntime = true;
string testFolder = DefaultRuntimeFolder + TestId;
- TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder);
+ string dataFilePath = GenerateDataFileAndGetPath();
+
+ TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder);
ValidateSuccessForLocalRuntime(Partitions + 1, testFolder: testFolder);
CleanUp(testFolder);
}
- [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+ [Fact(Skip = "Requires Yarn Single Node")]
[Trait("Priority", "1")]
[Trait("Category", "FunctionalGated")]
[Trait("Description", "Test KMeans clustering on reef YARN runtime - one box")]
@@ -129,11 +134,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
public void TestKMeansOnYarnOneBoxWithGroupCommunications()
{
string testFolder = DefaultRuntimeFolder + TestId + "Yarn";
- TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder);
+ string dataFilePath = GenerateDataFileAndGetPath();
+ TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder);
Assert.NotNull("BreakPointChecker");
}
- private IConfiguration DriverConfiguration()
+ private IConfiguration DriverConfiguration(string dataFilePath)
{
int fanOut = 2;
int totalEvaluators = Partitions + 1;
@@ -144,7 +150,7 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted, GenericType<KMeansDriverHandlers>.Class)
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, GenericType<KMeansDriverHandlers>.Class)
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive, GenericType<KMeansDriverHandlers>.Class)
- .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, "DataFile:" + _dataFile)
+ .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, dataFilePath)
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build())
.BindIntNamedParam<NumPartitions>(Partitions.ToString())
@@ -160,5 +166,33 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
return Configurations.Merge(driverConfig, groupCommunicationDriverConfig);
}
+
+ private static string GenerateDataFileAndGetPath()
+ {
+ string dataFilePath = Path.Combine(Path.GetTempPath(), DataFileNamePrefix + Guid.NewGuid().ToString("N").Substring(0, 4));
+
+ DataVector[] centroids =
+ {
+ new DataVector(new List<float> { +2f, 0f }, 0),
+ new DataVector(new List<float> { -2f, 0f }, 0),
+ new DataVector(new List<float> { +0f, 2f }, 0)
+ };
+
+ using (StreamWriter writer = new StreamWriter(File.OpenWrite(dataFilePath)))
+ {
+ Random rnd = new Random();
+ for (int i = 0; i < 10000; i++)
+ {
+ int label = rnd.Next(centroids.Length);
+ float x = Convert.ToSingle(centroids[label].Data[0] + rnd.NextDouble());
+ float y = Convert.ToSingle(centroids[label].Data[1] + rnd.NextDouble());
+
+ writer.WriteLine("{0},{1};{2}", x, y, label);
+ }
+ writer.Close();
+ }
+
+ return dataFilePath;
+ }
}
}