You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/11 23:58:09 UTC
[4/4] incubator-reef git commit: [REEF-150] Adding group
communication to REEF .Net
[REEF-150] Adding group communication to REEF .Net
This is to port Group Communication to REEF .Net in Apache Git
Add source code, examples and tests
Updated namespace to follow conventions
IIRA: Reef-150. (https://issues.apache.org/jira/browse/REEF-150)
Author: Julia Wang Email: jwang98052@yahoo.com
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0292caf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0292caf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0292caf1
Branch: refs/heads/master
Commit: 0292caf1437d61586d4e7ba1370710be833f5292
Parents: 7edb857
Author: Julia Wang <jw...@yahoo.com>
Authored: Tue Feb 10 19:16:14 2015 -0800
Committer: tmajest <tm...@microsoft.com>
Committed: Wed Feb 11 14:55:10 2015 -0800
----------------------------------------------------------------------
.../MachineLearning/KMeans/Centroids.cs | 44 ++
.../MachineLearning/KMeans/Constants.cs | 35 ++
.../KMeans/Contracts/CentroidsContract.cs | 49 ++
.../KMeans/Contracts/DataVectorContract.cs | 52 ++
.../KMeans/Contracts/PartialMeanContract.cs | 48 ++
.../Contracts/ProcessedResultsContract.cs | 57 +++
.../KMeans/DataPartitionCache.cs | 104 ++++
.../MachineLearning/KMeans/DataVector.cs | 260 ++++++++++
.../KMeans/KMeansConfiguratioinOptions.cs | 46 ++
.../KMeans/KMeansDriverHandlers.cs | 191 ++++++++
.../MachineLearning/KMeans/KMeansMasterTask.cs | 155 ++++++
.../MachineLearning/KMeans/KMeansSlaveTask.cs | 118 +++++
.../MachineLearning/KMeans/LegacyKMeansTask.cs | 113 +++++
.../MachineLearning/KMeans/PartialMean.cs | 124 +++++
.../MachineLearning/KMeans/ProcessedResults.cs | 54 +++
.../KMeans/codecs/CentroidsCodec.cs | 49 ++
.../KMeans/codecs/DataVectorCodec.cs | 46 ++
.../KMeans/codecs/ProcessedResultsCodec.cs | 57 +++
.../Org.Apache.REEF.Examples.csproj | 19 +
.../Group/Codec/GcmMessageProto.cs | 76 +++
.../Codec/GroupCommunicationMessageCodec.cs | 77 +++
.../Group/Config/MpiConfigurationOptions.cs | 71 +++
.../Group/Driver/ICommunicationGroupDriver.cs | 89 ++++
.../Group/Driver/IMpiDriver.cs | 93 ++++
.../Driver/Impl/CommunicationGroupDriver.cs | 260 ++++++++++
.../Driver/Impl/GroupCommunicationMessage.cs | 107 ++++
.../Group/Driver/Impl/MessageType.cs | 30 ++
.../Group/Driver/Impl/MpiDriver.cs | 239 +++++++++
.../Group/Driver/Impl/TaskStarter.cs | 135 ++++++
.../Group/Operators/IBroadcastReceiver.cs | 40 ++
.../Group/Operators/IBroadcastSender.cs | 40 ++
.../Group/Operators/IMpiOperator.cs | 49 ++
.../Group/Operators/IOperatorSpec.cs | 36 ++
.../Group/Operators/IReduceFunction.cs | 41 ++
.../Group/Operators/IReduceReceiver.cs | 46 ++
.../Group/Operators/IReduceSender.cs | 40 ++
.../Group/Operators/IScatterReceiver.cs | 41 ++
.../Group/Operators/IScatterSender.cs | 60 +++
.../Operators/Impl/BroadcastOperatorSpec.cs | 50 ++
.../Group/Operators/Impl/BroadcastReceiver.cs | 92 ++++
.../Group/Operators/Impl/BroadcastSender.cs | 98 ++++
.../Group/Operators/Impl/ReduceFunction.cs | 62 +++
.../Group/Operators/Impl/ReduceOperatorSpec.cs | 62 +++
.../Group/Operators/Impl/ReduceReceiver.cs | 100 ++++
.../Group/Operators/Impl/ReduceSender.cs | 97 ++++
.../Group/Operators/Impl/ScatterOperatorSpec.cs | 58 +++
.../Group/Operators/Impl/ScatterReceiver.cs | 101 ++++
.../Group/Operators/Impl/ScatterSender.cs | 112 +++++
.../Group/Operators/Impl/Sender.cs | 74 +++
.../Group/Task/ICommunicationGroupClient.cs | 90 ++++
.../Task/ICommunicationGroupNetworkObserver.cs | 49 ++
.../Group/Task/IMpiClient.cs | 44 ++
.../Group/Task/IMpiNetworkObserver.cs | 50 ++
.../Group/Task/Impl/CommunicationGroupClient.cs | 219 +++++++++
.../Impl/CommunicationGroupNetworkObserver.cs | 93 ++++
.../Group/Task/Impl/MpiClient.cs | 108 +++++
.../Group/Task/Impl/MpiNetworkObserver.cs | 109 +++++
.../Group/Task/Impl/NodeStruct.cs | 67 +++
.../Group/Task/Impl/OperatorTopology.cs | 484 +++++++++++++++++++
.../Group/Topology/FlatTopology.cs | 201 ++++++++
.../Group/Topology/ITopology.cs | 41 ++
.../Group/Topology/TaskNode.cs | 69 +++
.../Org.Apache.REEF.Network.csproj | 43 ++
.../Functional/ML/KMeans/TestKMeans.cs | 163 +++++++
.../BroadcastReduceDriver.cs | 187 +++++++
.../BroadcastReduceTest/BroadcastReduceTest.cs | 83 ++++
.../MPI/BroadcastReduceTest/MasterTask.cs | 89 ++++
.../MPI/BroadcastReduceTest/SlaveTask.cs | 80 +++
.../Functional/MPI/MpiTestConfig.cs | 36 ++
.../Functional/MPI/MpiTestConstants.cs | 33 ++
.../MPI/ScatterReduceTest/MasterTask.cs | 72 +++
.../ScatterReduceTest/ScatterReduceDriver.cs | 168 +++++++
.../MPI/ScatterReduceTest/ScatterReduceTest.cs | 80 +++
.../MPI/ScatterReduceTest/SlaveTask.cs | 67 +++
.../Org.Apache.REEF.Tests.csproj | 11 +
75 files changed, 6833 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Centroids.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Centroids.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Centroids.cs
new file mode 100644
index 0000000..0ceb4ed
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Centroids.cs
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class Centroids
+ {
+ public Centroids(List<DataVector> points)
+ {
+ Points = points;
+ }
+
+ public List<DataVector> Points { get; set; }
+
+ /// <summary>
+ /// helper function mostly used for logging
+ /// </summary>
+ /// <returns>the serialized string</returns>
+ public override string ToString()
+ {
+ return ByteUtilities.ByteArrarysToString(new CentroidsCodec().Encode(this));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Constants.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Constants.cs
new file mode 100644
index 0000000..526031e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Constants.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class Constants
+ {
+ public const string KMeansExecutionBaseDirectory = @"KMeans";
+ public const string DataDirectory = "data";
+ public const string PartialMeanFilePrefix = "partialMeans_";
+ public const string CentroidsFile = "centroids";
+ public const string MasterTaskId = "KMeansMasterTaskId";
+ public const string SlaveTaskIdPrefix = "KMeansSlaveTask_";
+ public const string KMeansCommunicationGroupName = "KMeansBroadcastReduceGroup";
+ public const string CentroidsBroadcastOperatorName = "CentroidsBroadcast";
+ public const string ControlMessageBroadcastOperatorName = "ControlMessageBroadcast";
+ public const string MeansReduceOperatorName = "MeansReduce";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/CentroidsContract.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/CentroidsContract.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/CentroidsContract.cs
new file mode 100644
index 0000000..8ea56c9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/CentroidsContract.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts
+{
+ [DataContract]
+ public class CentroidsContract
+ {
+ [DataMember]
+ public List<DataVectorContract> DataVectorContracts { get; set; }
+
+ public static CentroidsContract Create(Centroids centroids)
+ {
+ List<DataVectorContract> dataVectorContracts =
+ centroids.Points.Select(DataVectorContract.Create).ToList();
+
+ return new CentroidsContract { DataVectorContracts = dataVectorContracts };
+ }
+
+ public Centroids ToCentroids()
+ {
+ List<DataVector> dataVectors = DataVectorContracts
+ .Select(dv => dv.ToDataVector())
+ .ToList();
+
+ return new Centroids(dataVectors);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/DataVectorContract.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/DataVectorContract.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/DataVectorContract.cs
new file mode 100644
index 0000000..1e41944
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/DataVectorContract.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts
+{
+ [DataContract]
+ public class DataVectorContract
+ {
+ [DataMember]
+ public List<float> Data { get; private set; }
+
+ [DataMember]
+ public int Label { get; private set; }
+
+ [DataMember]
+ public int Dimension { get; private set; }
+
+ public static DataVectorContract Create(DataVector dataVector)
+ {
+ return new DataVectorContract()
+ {
+ Data = dataVector.Data,
+ Label = dataVector.Label,
+ Dimension = dataVector.Dimension
+ };
+ }
+
+ public DataVector ToDataVector()
+ {
+ return new DataVector(Data, Label);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/PartialMeanContract.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/PartialMeanContract.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/PartialMeanContract.cs
new file mode 100644
index 0000000..b62f8ff
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/PartialMeanContract.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts
+{
+ [DataContract]
+ public class PartialMeanContract
+ {
+ [DataMember]
+ public DataVectorContract DataVectContract { get; set; }
+
+ [DataMember]
+ public int Size { get; set; }
+
+ public static PartialMeanContract Create(PartialMean partialMean)
+ {
+ return new PartialMeanContract
+ {
+ DataVectContract = DataVectorContract.Create(partialMean.Mean),
+ Size = partialMean.Size
+ };
+ }
+
+ public PartialMean ToPartialMean()
+ {
+ DataVector dataVector = DataVectContract.ToDataVector();
+ return new PartialMean(dataVector, Size);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/ProcessedResultsContract.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/ProcessedResultsContract.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/ProcessedResultsContract.cs
new file mode 100644
index 0000000..c78ee3c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/Contracts/ProcessedResultsContract.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts
+{
+ [DataContract]
+ public class ProcessedResultsContract
+ {
+ [DataMember]
+ public List<PartialMeanContract> PartialMeanContracts { get; set; }
+
+ [DataMember]
+ public float Loss { get; set; }
+
+ public static ProcessedResultsContract Create(ProcessedResults obj)
+ {
+ List<PartialMeanContract> partialMeansContracts = obj.Means
+ .Select(PartialMeanContract.Create)
+ .ToList();
+
+ return new ProcessedResultsContract
+ {
+ PartialMeanContracts = partialMeansContracts,
+ Loss = obj.Loss
+ };
+ }
+
+ public ProcessedResults ToProcessedResults()
+ {
+ List<PartialMean> partialMeans = PartialMeanContracts
+ .Select(contract => contract.ToPartialMean())
+ .ToList();
+
+ return new ProcessedResults(partialMeans, Loss);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
new file mode 100644
index 0000000..91c3ed0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ // TODO: we should outsource some of the functionalites to a data loader implemenation
+ public class DataPartitionCache : IService
+ {
+ private static readonly Logger _Logger = Logger.GetLogger(typeof(DataPartitionCache));
+
+ [Inject]
+ public DataPartitionCache(
+ [Parameter(Value = typeof(PartitionIndex))] int partition,
+ [Parameter(Value = typeof(KMeansConfiguratioinOptions.ExecutionDirectory))] string executionDirectory)
+ {
+ Partition = partition;
+ if (Partition < 0)
+ {
+ _Logger.Log(Level.Info, "no data to load since partition = " + Partition);
+ }
+ else
+ {
+ _Logger.Log(Level.Info, "loading data for partition " + Partition);
+ DataVectors = loadData(partition, executionDirectory);
+ }
+ }
+
+ public List<DataVector> DataVectors { get; set; }
+
+ public int Partition { get; set; }
+
+ // read initial data from file and marked it as unlabeled (not associated with any centroid)
+ public static List<DataVector> ReadDataFile(string path, char seperator = ',')
+ {
+ List<DataVector> data = new List<DataVector>();
+ FileStream file = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read);
+ using (StreamReader reader = new StreamReader(file))
+ {
+ while (!reader.EndOfStream)
+ {
+ string line = reader.ReadLine();
+ if (!string.IsNullOrWhiteSpace(line))
+ {
+ data.Add(DataVector.FromString(line));
+ }
+ }
+ reader.Close();
+ }
+
+ return data;
+ }
+
+ public void LabelData(Centroids centroids)
+ {
+ foreach (DataVector vector in DataVectors)
+ {
+ float minimumDistance = float.MaxValue;
+ foreach (DataVector centroid in centroids.Points)
+ {
+ float d = vector.DistanceTo(centroid);
+ if (d < minimumDistance)
+ {
+ vector.Label = centroid.Label;
+ minimumDistance = d;
+ }
+ }
+ }
+ }
+
+ private List<DataVector> loadData(int partition, string executionDirectory)
+ {
+ string file = Path.Combine(executionDirectory, Constants.DataDirectory, partition.ToString(CultureInfo.InvariantCulture));
+ return ReadDataFile(file);
+ }
+
+ [NamedParameter("Data partition index", "partition", "")]
+ public class PartitionIndex : Name<int>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataVector.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataVector.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataVector.cs
new file mode 100644
index 0000000..fb6a16e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataVector.cs
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class DataVector
+ {
+ public DataVector(int dimension, int label = -1)
+ {
+ Dimension = dimension;
+ Data = Enumerable.Repeat((float)0, Dimension).ToList();
+ Label = label;
+ }
+
+ // unlablered data
+ public DataVector(List<float> data) : this(data, -1)
+ {
+ }
+
+ public DataVector(List<float> data, int label)
+ {
+ if (data == null || data.Count == 0)
+ {
+ throw new ArgumentNullException("data");
+ }
+ Dimension = data.Count;
+ Data = data;
+ Label = label;
+ }
+
+ public List<float> Data { get; set; }
+
+ public int Label { get; set; }
+
+ public int Dimension { get; set; }
+
+ public static float TotalDistance(List<DataVector> list1, List<DataVector> list2)
+ {
+ if (list1 == null || list2 == null || list1.Count == 0 || list2.Count == 0)
+ {
+ throw new ArgumentException("one of the input list is null or empty");
+ }
+ if (list1.Count != list2.Count)
+ {
+ throw new ArgumentException("list 1's dimensionality does not mach list 2.");
+ }
+ float distance = 0;
+ for (int i = 0; i < list1.Count; i++)
+ {
+ distance += list1[i].DistanceTo(list2[i]);
+ }
+ return distance;
+ }
+
+ public static DataVector Mean(List<DataVector> vectors)
+ {
+ if (vectors == null || vectors.Count == 0)
+ {
+ throw new ArgumentNullException("vectors");
+ }
+ DataVector mean = new DataVector(vectors[0].Dimension);
+ for (int i = 0; i < vectors.Count; i++)
+ {
+ mean = mean.Add(vectors[i], ignoreLabel: true);
+ }
+ return mean.Normalize(vectors.Count);
+ }
+
+ // shuffle data and write them to different partions (different files on disk for now)
+ public static List<DataVector> ShuffleDataAndGetInitialCentriods(string originalDataFile, int partitionsNum, int clustersNum, string executionDirectory)
+ {
+ List<DataVector> data = DataPartitionCache.ReadDataFile(originalDataFile);
+ // shuffle, not truely random, but sufficient for our purpose
+ data = data.OrderBy(a => Guid.NewGuid()).ToList();
+ string dataDirectory = Path.Combine(executionDirectory, Constants.DataDirectory);
+ // clean things up first
+ if (Directory.Exists(dataDirectory))
+ {
+ Directory.Delete(dataDirectory, true);
+ }
+ Directory.CreateDirectory(dataDirectory);
+
+ int residualCount = data.Count;
+ int batchSize = data.Count / partitionsNum;
+ for (int i = 0; i < partitionsNum; i++)
+ {
+ int linesCount = residualCount > batchSize ? batchSize : residualCount;
+ using (StreamWriter writer = new StreamWriter(
+ File.OpenWrite(Path.Combine(executionDirectory, Constants.DataDirectory, i.ToString(CultureInfo.InvariantCulture)))))
+ {
+ for (int j = i * batchSize; j < (i * batchSize) + linesCount; j++)
+ {
+ writer.WriteLine(data[j].ToString());
+ }
+ writer.Close();
+ }
+ }
+ return InitializeCentroids(clustersNum, data, executionDirectory);
+ }
+
+ public static void WriteToCentroidFile(List<DataVector> centroids, string executionDirectory)
+ {
+ string centroidFile = Path.Combine(executionDirectory, Constants.CentroidsFile);
+ File.Delete(centroidFile);
+ using (StreamWriter writer = new StreamWriter(File.OpenWrite(centroidFile)))
+ {
+ foreach (DataVector dataVector in centroids)
+ {
+ writer.WriteLine(dataVector.ToString());
+ }
+ writer.Close();
+ }
+ }
+
+ // TODO: replace with proper deserialization
+ public static DataVector FromString(string str)
+ {
+ if (string.IsNullOrWhiteSpace(str))
+ {
+ throw new ArgumentException("str");
+ }
+ string[] dataAndLable = str.Split(';');
+ if (dataAndLable == null || dataAndLable.Length > 2)
+ {
+ throw new ArgumentException("Cannot deserialize DataVector from string " + str);
+ }
+ int label = -1;
+ if (dataAndLable.Length == 2)
+ {
+ label = int.Parse(dataAndLable[1], CultureInfo.InvariantCulture);
+ }
+ List<float> data = dataAndLable[0].Split(',').Select(float.Parse).ToList();
+ return new DataVector(data, label);
+ }
+
+ // by default use squared euclidean disatance
+ // a naive implemenation without considering things like data normalization or overflow
+ // and it is not particular about efficiency
+ public float DistanceTo(DataVector other)
+ {
+ VectorsArithmeticPrecondition(other);
+ float d = 0;
+ for (int i = 0; i < Data.Count; i++)
+ {
+ float diff = Data[i] - other.Data[i];
+ d += diff * diff;
+ }
+ return d;
+ }
+
+ public float DistanceTo(List<DataVector> list)
+ {
+ float distance = 0;
+ for (int i = 0; i < list.Count; i++)
+ {
+ distance += this.DistanceTo(list[i]);
+ }
+ return distance;
+ }
+
+ public DataVector Add(DataVector other, bool ignoreLabel = false)
+ {
+ VectorsArithmeticPrecondition(other);
+ if (!ignoreLabel)
+ {
+ if (Label != other.Label)
+ {
+ throw new InvalidOperationException("by default cannot apply addition operation on data of different labels.");
+ }
+ }
+ List<float> sumData = new List<float>(Data);
+ for (int i = 0; i < Data.Count; i++)
+ {
+ sumData[i] += other.Data[i];
+ }
+ return new DataVector(sumData, ignoreLabel ? -1 : Label);
+ }
+
+ public DataVector Normalize(float normalizationFactor)
+ {
+ if (normalizationFactor == 0)
+ {
+ throw new InvalidOperationException("normalizationFactor is zero");
+ }
+ DataVector result = new DataVector(Data, Label);
+ for (int i = 0; i < Data.Count; i++)
+ {
+ result.Data[i] /= normalizationFactor;
+ }
+ return result;
+ }
+
+ public DataVector MultiplyScalar(float scalar)
+ {
+ DataVector result = new DataVector(Data, Label);
+ for (int i = 0; i < Data.Count; i++)
+ {
+ result.Data[i] *= scalar;
+ }
+ return result;
+ }
+
+ // TODO: replace with proper serialization
+ public override string ToString()
+ {
+ return string.Join(",", Data.Select(i => i.ToString(CultureInfo.InvariantCulture)).ToArray()) + ";" + Label;
+ }
+
+ // normally centroids are picked as random points from the vector space
+ // here we just pick K random data samples
+ private static List<DataVector> InitializeCentroids(int clustersNum, List<DataVector> data, string executionDirectory)
+ {
+ // again we used the not-some-random guid trick,
+ // not truly random and not quite efficient, but easy to implement as v1
+ List<DataVector> centroids = data.OrderBy(a => Guid.NewGuid()).Take(clustersNum).ToList();
+
+ // add label to centroids
+ for (int i = 0; i < centroids.Count; i++)
+ {
+ centroids[i].Label = i;
+ }
+ WriteToCentroidFile(centroids, executionDirectory);
+ return centroids;
+ }
+
+ private void VectorsArithmeticPrecondition(DataVector other)
+ {
+ if (other == null || other.Data == null)
+ {
+ throw new ArgumentNullException("other");
+ }
+ if (Data.Count != other.Data.Count)
+ {
+ throw new InvalidOperationException("vector dimentionality mismatch");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansConfiguratioinOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansConfiguratioinOptions.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansConfiguratioinOptions.cs
new file mode 100644
index 0000000..aeac77d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansConfiguratioinOptions.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class KMeansConfiguratioinOptions
+ {
+ [NamedParameter("Number of clusters", "K", "0")]
+ public class K : Name<int>
+ {
+ }
+
+ /// <summary>
+ /// This is for loading the initial data samples from file
+ /// currently it is assumed to load from local disk, we can easily extend this to
+ /// be an url that point to cloud storage, and have things downloaded from blob storage instead
+ /// </summary>
+ [NamedParameter("Directory for storing all execution data", "DD")]
+ public class ExecutionDirectory : Name<string>
+ {
+ }
+
+ [NamedParameter("Number of Evaluators")]
+ public class TotalNumEvaluators : Name<int>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
new file mode 100644
index 0000000..235a268
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Services;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class KMeansDriverHandlers :
+ IStartHandler,
+ IObserver<IEvaluatorRequestor>,
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>
+ {
+ private static readonly Logger _Logger = Logger.GetLogger(typeof(KMeansDriverHandlers));
+ private readonly object _lockObj = new object();
+ private string _executionDirectory;
+
+ // TODO: we may want to make this injectable
+ private int _partitionsNumber = 2;
+ private int _clustersNumber = 3;
+ private int _totalEvaluators;
+ private int _partitionInex = 0;
+ private IMpiDriver _mpiDriver;
+ private ICommunicationGroupDriver _commGroup;
+ private TaskStarter _mpiTaskStarter;
+
+ [Inject]
+ public KMeansDriverHandlers()
+ {
+ Identifier = "KMeansDriverId";
+ _executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4));
+ ISet<string> arguments = ClrHandlerHelper.GetCommandLineArguments();
+ string dataFile = arguments.Single(a => a.StartsWith("DataFile", StringComparison.Ordinal)).Split(':')[1];
+ DataVector.ShuffleDataAndGetInitialCentriods(
+ Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", dataFile),
+ _partitionsNumber,
+ _clustersNumber,
+ _executionDirectory);
+
+ _totalEvaluators = _partitionsNumber + 1;
+ _mpiDriver = new MpiDriver(Identifier, Constants.MasterTaskId, new AvroConfigurationSerializer());
+
+ _commGroup = _mpiDriver.NewCommunicationGroup(
+ Constants.KMeansCommunicationGroupName,
+ _totalEvaluators)
+ .AddBroadcast(Constants.CentroidsBroadcastOperatorName, new BroadcastOperatorSpec<Centroids>(Constants.MasterTaskId, new CentroidsCodec()))
+ .AddBroadcast(Constants.ControlMessageBroadcastOperatorName, new BroadcastOperatorSpec<ControlMessage>(Constants.MasterTaskId, new ControlMessageCodec()))
+ .AddReduce(Constants.MeansReduceOperatorName, new ReduceOperatorSpec<ProcessedResults>(Constants.MasterTaskId, new ProcessedResultsCodec(), new KMeansMasterTask.AggregateMeans()))
+ .Build();
+ _mpiTaskStarter = new TaskStarter(_mpiDriver, _totalEvaluators);
+
+ CreateClassHierarchy();
+ }
+
+ public string Identifier { get; set; }
+
+ public void OnNext(IEvaluatorRequestor evalutorRequestor)
+ {
+ int evaluatorsNumber = _totalEvaluators;
+ int memory = 2048;
+ int core = 1;
+ EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, core);
+
+ evalutorRequestor.Submit(request);
+ }
+
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ IConfiguration contextConfiguration = _mpiDriver.GetContextConfiguration();
+
+ int partitionNum;
+ if (_mpiDriver.IsMasterContextConfiguration(contextConfiguration))
+ {
+ partitionNum = -1;
+ }
+ else
+ {
+ lock (_lockObj)
+ {
+ partitionNum = _partitionInex;
+ _partitionInex++;
+ }
+ }
+
+ IConfiguration gcServiceConfiguration = _mpiDriver.GetServiceConfiguration();
+
+ IConfiguration commonServiceConfiguration = TangFactory.GetTang().NewConfigurationBuilder(gcServiceConfiguration)
+ .BindNamedParameter<DataPartitionCache.PartitionIndex, int>(GenericType<DataPartitionCache.PartitionIndex>.Class, partitionNum.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<KMeansConfiguratioinOptions.ExecutionDirectory, string>(GenericType<KMeansConfiguratioinOptions.ExecutionDirectory>.Class, _executionDirectory)
+ .BindNamedParameter<KMeansConfiguratioinOptions.TotalNumEvaluators, int>(GenericType<KMeansConfiguratioinOptions.TotalNumEvaluators>.Class, _totalEvaluators.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<KMeansConfiguratioinOptions.K, int>(GenericType<KMeansConfiguratioinOptions.K>.Class, _clustersNumber.ToString(CultureInfo.InvariantCulture))
+ .Build();
+
+ IConfiguration dataCacheServiceConfiguration = ServiceConfiguration.ConfigurationModule
+ .Set(ServiceConfiguration.Services, GenericType<DataPartitionCache>.Class)
+ .Build();
+
+ allocatedEvaluator.SubmitContextAndService(contextConfiguration, Configurations.Merge(commonServiceConfiguration, dataCacheServiceConfiguration));
+ }
+
+ public void OnNext(IActiveContext activeContext)
+ {
+ IConfiguration taskConfiguration;
+
+ if (_mpiDriver.IsMasterTaskContext(activeContext))
+ {
+ // Configure Master Task
+ taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, Constants.MasterTaskId)
+ .Set(TaskConfiguration.Task, GenericType<KMeansMasterTask>.Class)
+ .Build();
+
+ _commGroup.AddTask(Constants.MasterTaskId);
+ }
+ else
+ {
+ string slaveTaskId = Constants.SlaveTaskIdPrefix + activeContext.Id;
+ // Configure Slave Task
+ taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, Constants.SlaveTaskIdPrefix + activeContext.Id)
+ .Set(TaskConfiguration.Task, GenericType<KMeansSlaveTask>.Class)
+ .Build();
+
+ _commGroup.AddTask(slaveTaskId);
+ }
+ _mpiTaskStarter.QueueTask(taskConfiguration, activeContext);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ private void CreateClassHierarchy()
+ {
+ HashSet<string> clrDlls = new HashSet<string>();
+ clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+ clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+ clrDlls.Add(typeof(LegacyKMeansTask).Assembly.GetName().Name);
+ clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+ clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+ ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
new file mode 100644
index 0000000..3dd7adb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class KMeansMasterTask : ITask
+ {
+ private static Logger _logger = Logger.GetLogger(typeof(KMeansMasterTask));
+
+ private int _iteration = 0;
+
+ private ICommunicationGroupClient _commGroup;
+ private IBroadcastSender<Centroids> _dataBroadcastSender;
+ private IBroadcastSender<ControlMessage> _controlBroadcastSender;
+ private IReduceReceiver<ProcessedResults> _meansReducerReceiver;
+ private string _kMeansExecutionDirectory;
+ private Centroids _centroids;
+ private bool _isInitialIteration;
+
+ [Inject]
+ public KMeansMasterTask(
+ [Parameter(typeof(KMeansConfiguratioinOptions.TotalNumEvaluators))] int totalNumEvaluators,
+ [Parameter(Value = typeof(KMeansConfiguratioinOptions.ExecutionDirectory))] string executionDirectory,
+ IMpiClient mpiClient)
+ {
+ using (_logger.LogFunction("KMeansMasterTask"))
+ {
+ if (totalNumEvaluators <= 1)
+ {
+ throw new ArgumentException("There must be more than 1 Evaluators in total, but the total evaluators number provided is " + totalNumEvaluators);
+ }
+ _commGroup = mpiClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName);
+ _dataBroadcastSender = _commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName);
+ _meansReducerReceiver = _commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName);
+ _controlBroadcastSender = _commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName);
+ _kMeansExecutionDirectory = executionDirectory;
+ _isInitialIteration = true;
+ }
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ // TODO: this belongs to dedicated dataloader layer, will refactor once we have that
+ string centroidFile = Path.Combine(_kMeansExecutionDirectory, Constants.CentroidsFile);
+ _centroids = new Centroids(DataPartitionCache.ReadDataFile(centroidFile));
+
+ float loss = float.MaxValue;
+ float newLoss;
+
+ while (true)
+ {
+ if (_isInitialIteration)
+ {
+ // broadcast initial centroids to all slave nodes
+ _logger.Log(Level.Info, "Broadcasting INITIAL centroids to all slave nodes: " + _centroids);
+ _isInitialIteration = false;
+ }
+ else
+ {
+ ProcessedResults results = _meansReducerReceiver.Reduce();
+ _centroids = new Centroids(results.Means.Select(m => m.Mean).ToList());
+ _logger.Log(Level.Info, "Broadcasting new centroids to all slave nodes: " + _centroids);
+ newLoss = results.Loss;
+ _logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "The new loss value {0} at iteration {1} ", newLoss, _iteration));
+ if (newLoss > loss)
+ {
+ _controlBroadcastSender.Send(ControlMessage.STOP);
+ throw new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "The new loss {0} is larger than previous loss {1}, while loss function must be monotonically decreasing across iterations", newLoss, loss));
+ }
+ else if (newLoss.Equals(loss))
+ {
+ _logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "KMeans clustering has converged with a loss value of {0} at iteration {1} ", newLoss, _iteration));
+ break;
+ }
+ else
+ {
+ loss = newLoss;
+ }
+ }
+ _controlBroadcastSender.Send(ControlMessage.RECEIVE);
+ _dataBroadcastSender.Send(_centroids);
+ _iteration++;
+ }
+ _controlBroadcastSender.Send(ControlMessage.STOP);
+ return null;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public class AggregateMeans : IReduceFunction<ProcessedResults>
+ {
+ [Inject]
+ public AggregateMeans()
+ {
+ }
+
+ public ProcessedResults Reduce(IEnumerable<ProcessedResults> elements)
+ {
+ List<PartialMean> aggregatedMeans = new List<PartialMean>();
+ List<PartialMean> totalList = new List<PartialMean>();
+ float aggregatedLoss = 0;
+
+ foreach (var element in elements)
+ {
+ totalList.AddRange(element.Means);
+ aggregatedLoss += element.Loss;
+ }
+
+ // we infer the value of K from the labeled data
+ int clustersNum = totalList.Max(p => p.Mean.Label) + 1;
+ for (int i = 0; i < clustersNum; i++)
+ {
+ List<PartialMean> means = totalList.Where(m => m.Mean.Label == i).ToList();
+ aggregatedMeans.Add(new PartialMean(PartialMean.AggreatedMean(means), means.Count));
+ }
+
+ ProcessedResults returnMeans = new ProcessedResults(aggregatedMeans, aggregatedLoss);
+
+ _logger.Log(Level.Info, "The true means aggregated by the reduce function: " + returnMeans);
+ return returnMeans;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
new file mode 100644
index 0000000..a36fbcb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class KMeansSlaveTask : ITask
+ {
+ private static Logger _logger = Logger.GetLogger(typeof(KMeansSlaveTask));
+ private int _clustersNum;
+ private IMpiClient _mpiClient;
+ private ICommunicationGroupClient _commGroup;
+ private IBroadcastReceiver<Centroids> _dataBroadcastReceiver;
+ private IBroadcastReceiver<ControlMessage> _controlBroadcastReceiver;
+ private IReduceSender<ProcessedResults> _partialMeansSender;
+ private DataPartitionCache _dataPartition;
+
+ [Inject]
+ public KMeansSlaveTask(
+ DataPartitionCache dataPartition,
+ [Parameter(typeof(KMeansConfiguratioinOptions.TotalNumEvaluators))] int clustersNumber,
+ IMpiClient mpiClient)
+ {
+ using (_logger.LogFunction("KMeansSlaveTask::KMeansSlaveTask"))
+ {
+ _dataPartition = dataPartition;
+ _mpiClient = mpiClient;
+ _clustersNum = clustersNumber;
+ _commGroup = _mpiClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName);
+ _dataBroadcastReceiver = _commGroup.GetBroadcastReceiver<Centroids>(Constants.CentroidsBroadcastOperatorName);
+ _partialMeansSender = _commGroup.GetReduceSender<ProcessedResults>(Constants.MeansReduceOperatorName);
+ _controlBroadcastReceiver = _commGroup.GetBroadcastReceiver<ControlMessage>(Constants.ControlMessageBroadcastOperatorName);
+ }
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ while (true)
+ {
+ if (_controlBroadcastReceiver.Receive() == ControlMessage.STOP)
+ {
+ break;
+ }
+ Centroids centroids = _dataBroadcastReceiver.Receive();
+ // we compute the loss here before data is relabled, this does not reflect the latest clustering result at the end of current iteration,
+ // but it will save another round of group communications in each iteration
+ _logger.Log(Level.Info, "Received centroids from master: " + centroids);
+ _dataPartition.LabelData(centroids);
+ ProcessedResults partialMeans = new ProcessedResults(ComputePartialMeans(), ComputeLossFunction(centroids, _dataPartition.DataVectors));
+ _logger.Log(Level.Info, "Sending partial means: " + partialMeans);
+ _partialMeansSender.Send(partialMeans);
+ }
+
+ return null;
+ }
+
+ public void Dispose()
+ {
+ _mpiClient.Dispose();
+ }
+
+ private List<PartialMean> ComputePartialMeans()
+ {
+ _logger.Log(Level.Verbose, "cluster number " + _clustersNum);
+ List<PartialMean> partialMeans = new PartialMean[_clustersNum].ToList();
+ for (int i = 0; i < _clustersNum; i++)
+ {
+ List<DataVector> slices = _dataPartition.DataVectors.Where(d => d.Label == i).ToList();
+ DataVector average = new DataVector(_dataPartition.DataVectors[0].Dimension);
+
+ if (slices.Count > 1)
+ {
+ average = DataVector.Mean(slices);
+ }
+ average.Label = i;
+ partialMeans[i] = new PartialMean(average, slices.Count);
+ _logger.Log(Level.Info, "Adding to partial means list: " + partialMeans[i]);
+ }
+ return partialMeans;
+ }
+
+ private float ComputeLossFunction(Centroids centroids, List<DataVector> labeledData)
+ {
+ float d = 0;
+ for (int i = 0; i < centroids.Points.Count; i++)
+ {
+ DataVector centroid = centroids.Points[i];
+ List<DataVector> slice = labeledData.Where(v => v.Label == centroid.Label).ToList();
+ d += centroid.DistanceTo(slice);
+ }
+ return d;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs
new file mode 100644
index 0000000..b674d84
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ /// <summary>
+ /// This is the legacy KmeansTask implmented when group communications are not available
+ /// It is still being used for plain KMeans without REEF, we probably want to refactor it later
+ /// to reflect that
+ /// </summary>
+ public class LegacyKMeansTask
+ {
+ private int _clustersNum;
+ private DataPartitionCache _dataPartition;
+ private string _kMeansExecutionDirectory;
+
+ private Centroids _centroids;
+ private List<PartialMean> _partialMeans;
+
+ [Inject]
+ public LegacyKMeansTask(
+ DataPartitionCache dataPartition,
+ [Parameter(Value = typeof(KMeansConfiguratioinOptions.K))] int clustersNumber,
+ [Parameter(Value = typeof(KMeansConfiguratioinOptions.ExecutionDirectory))] string executionDirectory)
+ {
+ _dataPartition = dataPartition;
+ _clustersNum = clustersNumber;
+ _kMeansExecutionDirectory = executionDirectory;
+ if (_centroids == null)
+ {
+ string centroidFile = Path.Combine(_kMeansExecutionDirectory, Constants.CentroidsFile);
+ _centroids = new Centroids(DataPartitionCache.ReadDataFile(centroidFile));
+ }
+ }
+
+ public static float ComputeLossFunction(List<DataVector> centroids, List<DataVector> labeledData)
+ {
+ float d = 0;
+ for (int i = 0; i < centroids.Count; i++)
+ {
+ DataVector centroid = centroids[i];
+ List<DataVector> slice = labeledData.Where(v => v.Label == centroid.Label).ToList();
+ d += centroid.DistanceTo(slice);
+ }
+ return d;
+ }
+
+ public byte[] CallWithWritingToFileSystem(byte[] memento)
+ {
+ string centroidFile = Path.Combine(_kMeansExecutionDirectory, Constants.CentroidsFile);
+ _centroids = new Centroids(DataPartitionCache.ReadDataFile(centroidFile));
+
+ _dataPartition.LabelData(_centroids);
+ _partialMeans = ComputePartialMeans();
+
+ // should be replaced with MPI
+ using (StreamWriter writer = new StreamWriter(
+ File.OpenWrite(Path.Combine(_kMeansExecutionDirectory, Constants.DataDirectory, Constants.PartialMeanFilePrefix + _dataPartition.Partition))))
+ {
+ for (int i = 0; i < _partialMeans.Count; i++)
+ {
+ writer.WriteLine(_partialMeans[i].ToString());
+ }
+ writer.Close();
+ }
+
+ return null;
+ }
+
+ public List<PartialMean> ComputePartialMeans()
+ {
+ List<PartialMean> partialMeans = new PartialMean[_clustersNum].ToList();
+ for (int i = 0; i < _clustersNum; i++)
+ {
+ List<DataVector> slices = _dataPartition.DataVectors.Where(d => d.Label == i).ToList();
+ DataVector average = new DataVector(_dataPartition.DataVectors[0].Dimension);
+
+ if (slices.Count > 1)
+ {
+ average = DataVector.Mean(slices);
+ }
+ average.Label = i;
+ partialMeans[i] = new PartialMean(average, slices.Count);
+ }
+ return partialMeans;
+ }
+
+ public void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
new file mode 100644
index 0000000..6f44167
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ public class PartialMean
+ {
+ public PartialMean(DataVector vector, int size)
+ {
+ Mean = vector;
+ Size = size;
+ }
+
+ public PartialMean()
+ {
+ }
+
+ public DataVector Mean { get; set; }
+
+ public int Size { get; set; }
+
+ public static PartialMean FromString(string str)
+ {
+ if (string.IsNullOrWhiteSpace(str))
+ {
+ throw new ArgumentException("str");
+ }
+ string[] parts = str.Split('#');
+ if (parts == null || parts.Length != 2)
+ {
+ throw new ArgumentException("Cannot deserialize PartialMean from string " + str);
+ }
+ return new PartialMean(DataVector.FromString(parts[0]), int.Parse(parts[1], CultureInfo.InvariantCulture));
+ }
+
+ public static DataVector AggreatedMean(List<PartialMean> means)
+ {
+ if (means == null || means.Count == 0)
+ {
+ throw new ArgumentException("means");
+ }
+ PartialMean mean = means[0];
+ for (int i = 1; i < means.Count; i++)
+ {
+ mean = mean.CombinePartialMean(means[i]);
+ }
+ return mean.Mean;
+ }
+
+ public static List<DataVector> AggregateTrueMeansToFileSystem(int partitionsNum, int clustersNum, string executionDirectory)
+ {
+ List<PartialMean> partialMeans = new List<PartialMean>();
+ for (int i = 0; i < partitionsNum; i++)
+ {
+ // should be replaced with MPI
+ string path = Path.Combine(executionDirectory, Constants.DataDirectory, Constants.PartialMeanFilePrefix + i.ToString(CultureInfo.InvariantCulture));
+ FileStream file = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read);
+ using (StreamReader reader = new StreamReader(file))
+ {
+ int index = 0;
+ while (!reader.EndOfStream)
+ {
+ string line = reader.ReadLine();
+ if (index++ < clustersNum)
+ {
+ partialMeans.Add(PartialMean.FromString(line));
+ }
+ }
+ reader.Close();
+ }
+ }
+ List<DataVector> newCentroids = new List<DataVector>();
+ for (int i = 0; i < clustersNum; i++)
+ {
+ List<PartialMean> means = partialMeans.Where(m => m.Mean.Label == i).ToList();
+ newCentroids.Add(PartialMean.AggreatedMean(means));
+ }
+ return newCentroids;
+ }
+
+ public override string ToString()
+ {
+ return Mean.ToString() + "#" + Size;
+ }
+
+ private PartialMean CombinePartialMean(PartialMean other)
+ {
+ PartialMean aggreatedMean = new PartialMean();
+ if (other == null)
+ {
+ throw new ArgumentNullException("other");
+ }
+ if (Mean.Label != other.Mean.Label)
+ {
+ throw new ArgumentException("cannot combine partial means with different labels");
+ }
+ aggreatedMean.Size = Size + other.Size;
+ aggreatedMean.Mean = (Mean.MultiplyScalar(Size).Add(other.Mean.MultiplyScalar(other.Size))).Normalize(aggreatedMean.Size);
+ return aggreatedMean;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/ProcessedResults.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/ProcessedResults.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/ProcessedResults.cs
new file mode 100644
index 0000000..3e3394f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/ProcessedResults.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
+{
+ /// <summary>
+ /// ProcessedResults includes a list of "PartialMeans" and "Partial loss", but it can be used to denote
+ /// the "whole" means as well, aggreated from all PartialMeans
+ /// </summary>
+ public class ProcessedResults
+ {
+ public ProcessedResults(List<PartialMean> means, float loss)
+ {
+ Means = means;
+ Loss = loss;
+ }
+
+ public List<PartialMean> Means { get; set; }
+
+ /// <summary>
+ /// the loss for current slice computed from
+ /// </summary>
+ public float Loss { get; set; }
+
+ /// <summary>
+ /// helper function mostly used for logging
+ /// </summary>
+ /// <returns>seralized string</returns>
+ public override string ToString()
+ {
+ return ByteUtilities.ByteArrarysToString(new ProcessedResultsCodec().Encode(this));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/CentroidsCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/CentroidsCodec.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/CentroidsCodec.cs
new file mode 100644
index 0000000..19480b6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/CentroidsCodec.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs
+{
+ /// <summary>
+ /// Used to serialize and deserialize Centroids objects.
+ /// </summary>
+ public class CentroidsCodec : ICodec<Centroids>
+ {
+ [Inject]
+ public CentroidsCodec()
+ {
+ }
+
+ public byte[] Encode(Centroids centroids)
+ {
+ CentroidsContract contract = CentroidsContract.Create(centroids);
+ return AvroUtils.AvroSerialize(contract);
+ }
+
+ public Centroids Decode(byte[] data)
+ {
+ CentroidsContract contract = AvroUtils.AvroDeserialize<CentroidsContract>(data);
+ return contract.ToCentroids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/DataVectorCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/DataVectorCodec.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/DataVectorCodec.cs
new file mode 100644
index 0000000..820162e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/DataVectorCodec.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Examples.MachineLearning.KMeans.Contracts;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs
+{
+ public class DataVectorCodec : ICodec<DataVector>
+ {
+ [Inject]
+ public DataVectorCodec()
+ {
+ }
+
+ public byte[] Encode(DataVector obj)
+ {
+ DataVectorContract contract = DataVectorContract.Create(obj);
+ return AvroUtils.AvroSerialize(contract);
+ }
+
+ public DataVector Decode(byte[] data)
+ {
+ DataVectorContract contract = AvroUtils.AvroDeserialize<DataVectorContract>(data);
+ return contract.ToDataVector();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/ProcessedResultsCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/ProcessedResultsCodec.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/ProcessedResultsCodec.cs
new file mode 100644
index 0000000..8166030
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/codecs/ProcessedResultsCodec.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs
+{
+ /// <summary>
+ /// TODO: use proper avro scheme to do encode/decode
+ /// </summary>
+ public class ProcessedResultsCodec : ICodec<ProcessedResults>
+ {
+ [Inject]
+ public ProcessedResultsCodec()
+ {
+ }
+
+ public byte[] Encode(ProcessedResults results)
+ {
+ return ByteUtilities.StringToByteArrays(results.Loss + "+" + string.Join("@", results.Means.Select(m => m.ToString())));
+ }
+
+ public ProcessedResults Decode(byte[] data)
+ {
+ string[] parts = ByteUtilities.ByteArrarysToString(data).Split('+');
+ if (parts.Count() != 2)
+ {
+ throw new ArgumentException("cannot deserialize from" + ByteUtilities.ByteArrarysToString(data));
+ }
+ float loss = float.Parse(parts[0], CultureInfo.InvariantCulture);
+ List<PartialMean> means = parts[1].Split('@').Select(PartialMean.FromString).ToList();
+ return new ProcessedResults(means, loss);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
index e2691c5..e3ca589 100644
--- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj
@@ -72,6 +72,7 @@ under the License.
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
+ <Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
@@ -95,6 +96,24 @@ under the License.
<Compile Include="HelloCLRBridge\Handlers\HelloStartHandler.cs" />
<Compile Include="HelloCLRBridge\Handlers\HelloTaskMessageHandler.cs" />
<Compile Include="HelloCLRBridge\HelloTraceListener.cs" />
+ <Compile Include="MachineLearning\KMeans\Centroids.cs" />
+ <Compile Include="MachineLearning\KMeans\codecs\CentroidsCodec.cs" />
+ <Compile Include="MachineLearning\KMeans\codecs\DataVectorCodec.cs" />
+ <Compile Include="MachineLearning\KMeans\codecs\ProcessedResultsCodec.cs" />
+ <Compile Include="MachineLearning\KMeans\Constants.cs" />
+ <Compile Include="MachineLearning\KMeans\Contracts\CentroidsContract.cs" />
+ <Compile Include="MachineLearning\KMeans\Contracts\DataVectorContract.cs" />
+ <Compile Include="MachineLearning\KMeans\Contracts\PartialMeanContract.cs" />
+ <Compile Include="MachineLearning\KMeans\Contracts\ProcessedResultsContract.cs" />
+ <Compile Include="MachineLearning\KMeans\DataPartitionCache.cs" />
+ <Compile Include="MachineLearning\KMeans\DataVector.cs" />
+ <Compile Include="MachineLearning\KMeans\KMeansConfiguratioinOptions.cs" />
+ <Compile Include="MachineLearning\KMeans\KMeansDriverHandlers.cs" />
+ <Compile Include="MachineLearning\KMeans\KMeansMasterTask.cs" />
+ <Compile Include="MachineLearning\KMeans\KMeansSlaveTask.cs" />
+ <Compile Include="MachineLearning\KMeans\LegacyKMeansTask.cs" />
+ <Compile Include="MachineLearning\KMeans\PartialMean.cs" />
+ <Compile Include="MachineLearning\KMeans\ProcessedResults.cs" />
<Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalActiveContextHandler.cs" />
<Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalAllocatedEvaluatorHandler.cs" />
<Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalEvaluatorRequestorHandler.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
new file mode 100644
index 0000000..8a5a726
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using ProtoBuf;
+
+namespace Org.Apache.REEF.Network.Group.Codec
+{
+ [ProtoContract]
+ public class GcmMessageProto
+ {
+ [ProtoMember(1)]
+ public byte[][] Data { get; set; }
+
+ [ProtoMember(2)]
+ public string OperatorName { get; set; }
+
+ [ProtoMember(3)]
+ public string GroupName { get; set; }
+
+ [ProtoMember(4)]
+ public string Source { get; set; }
+
+ [ProtoMember(5)]
+ public string Destination { get; set; }
+
+ [ProtoMember(6)]
+ public MessageType MsgType { get; set; }
+
+ public static GcmMessageProto Create(GroupCommunicationMessage gcm)
+ {
+ return new GcmMessageProto()
+ {
+ Data = gcm.Data,
+ OperatorName = gcm.OperatorName,
+ GroupName = gcm.GroupName,
+ Source = gcm.Source,
+ Destination = gcm.Destination,
+ MsgType = gcm.MsgType,
+ };
+ }
+
+ public GroupCommunicationMessage ToGcm()
+ {
+ return new GroupCommunicationMessage(
+ GroupName,
+ OperatorName,
+ Source,
+ Destination,
+ Data,
+ MsgType);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
new file mode 100644
index 0000000..a8f884f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Network.Group.Codec;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using ProtoBuf;
+
+namespace Org.Apache.REEF.Network.Group.Codec
+{
+ /// <summary>
+ /// Used to serialize GroupCommunicationMessages.
+ /// </summary>
+ public class GroupCommunicationMessageCodec : ICodec<GroupCommunicationMessage>
+ {
+ /// <summary>
+ /// Create a new GroupCommunicationMessageCodec.
+ /// </summary>
+ [Inject]
+ public GroupCommunicationMessageCodec()
+ {
+ }
+
+ /// <summary>
+ /// Serialize the GroupCommunicationObject into a byte array using Protobuf.
+ /// </summary>
+ /// <param name="obj">The object to serialize.</param>
+ /// <returns>The serialized GroupCommunicationMessage in byte array form</returns>
+ public byte[] Encode(GroupCommunicationMessage obj)
+ {
+ GcmMessageProto proto = GcmMessageProto.Create(obj);
+ using (var stream = new MemoryStream())
+ {
+ Serializer.Serialize(stream, proto);
+ return stream.ToArray();
+ }
+ }
+
+ /// <summary>
+ /// Deserialize the byte array into a GroupCommunicationMessage using Protobuf.
+ /// </summary>
+ /// <param name="data">The byte array to deserialize</param>
+ /// <returns>The deserialized GroupCommunicationMessage object.</returns>
+ public GroupCommunicationMessage Decode(byte[] data)
+ {
+ using (var stream = new MemoryStream(data))
+ {
+ GcmMessageProto proto = Serializer.Deserialize<GcmMessageProto>(stream);
+ return proto.ToGcm();
+ }
+ }
+ }
+}