You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/07/15 21:13:09 UTC

reef git commit: [REEF-1183] Enable KMeans .NET tests

Repository: reef
Updated Branches:
  refs/heads/master d0b675238 -> 83e5e8667


[REEF-1183] Enable KMeans .NET tests

This addressed the issue by
  * removing dependency on an obsolete file
  * adding code for on-the-fly input file generaton
  * fixing KMeans tests to generate an input file before running and take it as input
  * fix bug by removing default value for DataPartitionCache.PartitionIndex
  * fix bug of requesting only one evaluator
  * fix bug of miscalculating partial means in algorithm
  * fix bug of using CodecConfiguration instead of CodecToStreamingCodecConfiguration

JIRA:
  [REEF-1183](https://issues.apache.org/jira/browse/REEF-1183)

Pull Request:
  This closes #1053


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/83e5e866
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/83e5e866
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/83e5e866

Branch: refs/heads/master
Commit: 83e5e8667320b133ca75e1763627f18af28307b7
Parents: d0b6752
Author: Jason (Joo Seong) Jeong <cu...@gmail.com>
Authored: Wed Jun 22 18:14:52 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Fri Jul 15 14:11:43 2016 -0700

----------------------------------------------------------------------
 .../KMeans/DataPartitionCache.cs                |  2 +-
 .../KMeans/KMeansDriverHandlers.cs              | 12 ++--
 .../MachineLearning/KMeans/KMeansMasterTask.cs  |  3 +-
 .../MachineLearning/KMeans/PartialMean.cs       | 11 ++-
 .../Functional/ML/KMeans/TestKMeans.cs          | 72 ++++++++++++++------
 5 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
index 4c60a0f..460cd0b 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             return ReadDataFile(file);
         }
 
-        [NamedParameter("Data partition index", "partition", "")]
+        [NamedParameter("Data partition index", "partition")]
         public class PartitionIndex : Name<int>
         {
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index e0e75f8..a266a62 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -73,9 +73,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             CommandLineArguments arguments)
         {
             _executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4));
-            string dataFile = arguments.Arguments.Single(a => a.StartsWith("DataFile", StringComparison.Ordinal)).Split(':')[1];
+            string dataFile = arguments.Arguments.First();
             DataVector.ShuffleDataAndGetInitialCentriods(
-                Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", dataFile),
+                dataFile,
                 numPartitions,
                 _clustersNumber,
                 _executionDirectory);
@@ -86,7 +86,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _evaluatorRequestor = evaluatorRequestor;
 
             _centroidCodecConf = CodecToStreamingCodecConfiguration<Centroids>.Conf
-                .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
+                .Set(CodecToStreamingCodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class)
                 .Build();
 
             IConfiguration dataConverterConfig1 = PipelineDataConverterConfiguration<Centroids>.Conf
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 .Build();
 
             _controlMessageCodecConf = CodecToStreamingCodecConfiguration<ControlMessage>.Conf
-                .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
+                .Set(CodecToStreamingCodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class)
                 .Build();
 
             IConfiguration dataConverterConfig2 = PipelineDataConverterConfiguration<ControlMessage>.Conf
@@ -102,7 +102,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 .Build();
 
             _processedResultsCodecConf = CodecToStreamingCodecConfiguration<ProcessedResults>.Conf
-                .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
+                .Set(CodecToStreamingCodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class)
                 .Build();
 
             IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf
@@ -187,7 +187,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
         public void OnNext(IDriverStarted value)
         {
-            var request = _evaluatorRequestor.NewBuilder().SetCores(1).SetMegabytes(2048).Build();
+            var request = _evaluatorRequestor.NewBuilder().SetNumber(_totalEvaluators).SetCores(1).SetMegabytes(2048).Build();
 
             _evaluatorRequestor.Submit(request);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
index c1b3e90..140897b 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
@@ -140,7 +140,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 for (int i = 0; i < clustersNum; i++)
                 {
                     List<PartialMean> means = totalList.Where(m => m.Mean.Label == i).ToList();
-                    aggregatedMeans.Add(new PartialMean(PartialMean.AggreatedMean(means), means.Count));
+                    PartialMean aggregatedPartialMean = PartialMean.AggregatedPartialMean(means);
+                    aggregatedMeans.Add(new PartialMean(aggregatedPartialMean.Mean, aggregatedPartialMean.Size));
                 }
 
                 ProcessedResults returnMeans = new ProcessedResults(aggregatedMeans, aggregatedLoss);

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
index afeda32..f923a61 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
@@ -53,7 +53,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             return new PartialMean(DataVector.FromString(parts[0]), int.Parse(parts[1], CultureInfo.InvariantCulture));
         }
 
-        public static DataVector AggreatedMean(List<PartialMean> means)
+        public static PartialMean AggregatedPartialMean(List<PartialMean> means)
         {
             if (means == null || means.Count == 0)
             {
@@ -64,7 +64,12 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             {
                 mean = mean.CombinePartialMean(means[i]);
             }
-            return mean.Mean;
+            return mean;
+        }
+
+        public static DataVector AggregatedMean(List<PartialMean> means)
+        {
+            return AggregatedPartialMean(means).Mean;
         }
 
         public static List<DataVector> AggregateTrueMeansToFileSystem(int partitionsNum, int clustersNum, string executionDirectory)
@@ -93,7 +98,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             for (int i = 0; i < clustersNum; i++)
             {
                 List<PartialMean> means = partialMeans.Where(m => m.Mean.Label == i).ToList();
-                newCentroids.Add(PartialMean.AggreatedMean(means));
+                newCentroids.Add(PartialMean.AggregatedMean(means));
             }
             return newCentroids;
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 0e42ef6..0afda40 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -40,24 +40,15 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
     {
         private const int K = 3;
         private const int Partitions = 2;
-        private const string SmallMouseDataFile = @"mouseData_small.csv";
-        private const string MouseDataFile = @"mouseData.csv";
-
-        private readonly bool _useSmallDataSet = false;
-        private string _dataFile = MouseDataFile;
+        private const string DataFileNamePrefix = "KMeansInput-";
 
         public TestKMeans()
         {
-            if (_useSmallDataSet)
-            {
-                _dataFile = SmallMouseDataFile;
-            }  
-
             CleanUp();
             Init();
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+        [Fact]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering with things directly run without reef")]
@@ -65,8 +56,10 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         public void TestKMeansOnDirectRunViaFileSystem()
         {
             int iteration = 0;
-            string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4));
-            List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, executionDirectory);
+            string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(),
+                string.Join("-", Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)));
+            string dataFilePath = GenerateDataFileAndGetPath();
+            List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(dataFilePath, Partitions, K, executionDirectory);
             
             // initialize all tasks
             List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>();
@@ -105,9 +98,19 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
                 }
                 iteration++;
             }
+
+            // cleanup workspace
+            try
+            {
+                Directory.Delete(executionDirectory, true);
+            }
+            catch (Exception)
+            {
+                // do not fail if clean up is unsuccessful
+            }
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+        [Fact]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering on reef local runtime with group communications")]
@@ -116,12 +119,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         {
             IsOnLocalRuntime = true;
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder);
+            string dataFilePath = GenerateDataFileAndGetPath();
+
+            TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder);
             ValidateSuccessForLocalRuntime(Partitions + 1, testFolder: testFolder);
             CleanUp(testFolder);
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")]
+        [Fact(Skip = "Requires Yarn Single Node")]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering on reef YARN runtime - one box")]
@@ -129,11 +134,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         public void TestKMeansOnYarnOneBoxWithGroupCommunications()
         {
             string testFolder = DefaultRuntimeFolder + TestId + "Yarn";
-            TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder);
+            string dataFilePath = GenerateDataFileAndGetPath();
+            TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder);
             Assert.NotNull("BreakPointChecker");
         }
 
-        private IConfiguration DriverConfiguration()
+        private IConfiguration DriverConfiguration(string dataFilePath)
         {
             int fanOut = 2;
             int totalEvaluators = Partitions + 1;
@@ -144,7 +150,7 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
                     .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted, GenericType<KMeansDriverHandlers>.Class)
                     .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, GenericType<KMeansDriverHandlers>.Class)
                     .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive, GenericType<KMeansDriverHandlers>.Class)
-                    .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, "DataFile:" + _dataFile)
+                    .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, dataFilePath)
                     .Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
                     .Build())
                 .BindIntNamedParam<NumPartitions>(Partitions.ToString())
@@ -160,5 +166,33 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
 
             return Configurations.Merge(driverConfig, groupCommunicationDriverConfig);
         }
+
+        private static string GenerateDataFileAndGetPath()
+        {
+            string dataFilePath = Path.Combine(Path.GetTempPath(), DataFileNamePrefix + Guid.NewGuid().ToString("N").Substring(0, 4));
+
+            DataVector[] centroids =
+            {
+                new DataVector(new List<float> { +2f, 0f }, 0),
+                new DataVector(new List<float> { -2f, 0f }, 0),
+                new DataVector(new List<float> { +0f, 2f }, 0)
+            };
+
+            using (StreamWriter writer = new StreamWriter(File.OpenWrite(dataFilePath)))
+            {
+                Random rnd = new Random();
+                for (int i = 0; i < 10000; i++)
+                {
+                    int label = rnd.Next(centroids.Length);
+                    float x = Convert.ToSingle(centroids[label].Data[0] + rnd.NextDouble());
+                    float y = Convert.ToSingle(centroids[label].Data[1] + rnd.NextDouble());
+
+                    writer.WriteLine("{0},{1};{2}", x, y, label);
+                }
+                writer.Close();
+            }
+
+            return dataFilePath;
+        }
     }
 }