You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/05/31 16:55:36 UTC

[2/2] ignite git commit: IGNITE-5113: Implemented basic distributed/local kmeans clusterization algorithm.

IGNITE-5113: Implemented basic distributed/local kmeans clusterization algorithm.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b04b5800
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b04b5800
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b04b5800

Branch: refs/heads/master
Commit: b04b58005d32debc3ae54dd3b03289fad1a590bd
Parents: d0bddfa
Author: Artem Malykh <am...@gridgain.com>
Authored: Fri May 19 19:40:12 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Wed May 31 19:54:48 2017 +0300

----------------------------------------------------------------------
 modules/ml/pom.xml                              |  13 +
 .../main/java/org/apache/ignite/ml/Model.java   |  39 +++
 .../ml/clustering/BaseKMeansClusterer.java      |  98 ++++++
 .../apache/ignite/ml/clustering/Clusterer.java  |  32 ++
 .../ml/clustering/ClusterizationModel.java      |  29 ++
 .../clustering/KMeansDistributedClusterer.java  | 298 +++++++++++++++++++
 .../ml/clustering/KMeansLocalClusterer.java     | 174 +++++++++++
 .../ignite/ml/clustering/KMeansModel.java       |  79 +++++
 .../ignite/ml/clustering/WeightedClusterer.java |  38 +++
 .../ignite/ml/clustering/package-info.java      |  22 ++
 .../apache/ignite/ml/math/DistanceMeasure.java  |  39 +++
 .../ignite/ml/math/EuclideanDistance.java       |  48 +++
 .../org/apache/ignite/ml/math/MathUtils.java    |  31 ++
 .../java/org/apache/ignite/ml/math/Matrix.java  |   2 +
 .../apache/ignite/ml/math/StorageConstants.java |   3 +
 .../org/apache/ignite/ml/math/VectorUtils.java  |  41 +++
 .../math/exceptions/ConvergenceException.java   |  48 +++
 .../exceptions/MathArithmeticException.java     |   6 +-
 .../exceptions/MathIllegalNumberException.java  |  51 ++++
 .../exceptions/MathIllegalStateException.java   |  49 +++
 .../exceptions/NumberIsTooSmallException.java   |  79 +++++
 .../ignite/ml/math/functions/Functions.java     |  39 +++
 .../apache/ignite/ml/math/impls/CacheUtils.java |  47 +++
 .../ml/math/impls/matrix/AbstractMatrix.java    |   2 +
 .../impls/matrix/SparseDistributedMatrix.java   |   2 +-
 .../impls/storage/matrix/MapWrapperStorage.java |  93 ++++++
 .../vector/SparseLocalOnHeapVectorStorage.java  |  32 ++
 .../ml/math/impls/vector/DelegatingVector.java  |   5 +
 .../ml/math/impls/vector/MapWrapperVector.java  |  32 ++
 .../ml/math/impls/vector/SparseLocalVector.java |   9 +
 .../ignite/ml/math/impls/vector/VectorView.java |   1 +
 .../ignite/ml/math/statistics/Variance.java     |  53 ++++
 .../org/apache/ignite/ml/math/util/MapUtil.java |  38 +++
 .../apache/ignite/ml/math/util/MatrixUtil.java  |  38 ++-
 .../org/apache/ignite/ml/IgniteMLTestSuite.java |   4 +-
 .../ml/clustering/ClusteringTesetSuite.java     |  15 +
 .../KMeansDistributedClustererTest.java         | 184 ++++++++++++
 .../ml/clustering/KMeansLocalClustererTest.java |  46 +++
 .../apache/ignite/ml/clustering/KMeansUtil.java |  33 ++
 .../ignite/ml/math/MathImplLocalTestSuite.java  |   1 +
 40 files changed, 1885 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/pom.xml
----------------------------------------------------------------------
diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml
index 1df5bb6..75ebf1b 100644
--- a/modules/ml/pom.xml
+++ b/modules/ml/pom.xml
@@ -85,6 +85,19 @@
             <version>${spring.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-rng-core</artifactId>
+            <version>1.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-rng-simple</artifactId>
+            <version>1.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/Model.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/Model.java b/modules/ml/src/main/java/org/apache/ignite/ml/Model.java
new file mode 100644
index 0000000..e4d2117
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/Model.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml;
+
+import java.util.function.BiFunction;
+
+
+/** Basic interface for all models. */
+@FunctionalInterface
+public interface Model<T, V> {
+    /** Predict a result for value. */
+    public V predict(T val);
+
+    /**
+     * Combines this model with other model via specified combiner
+     *
+     * @param other Other model.
+     * @param combiner Combiner.
+     * @return Combination of models.
+     */
+    public default <X, W> Model<T, X> combine(Model<T, W> other, BiFunction<V, W, X> combiner) {
+        return v -> combiner.apply(predict(v), other.predict(v));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/BaseKMeansClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/BaseKMeansClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/BaseKMeansClusterer.java
new file mode 100644
index 0000000..a6acb8e
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/BaseKMeansClusterer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.util.List;
+
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.math.DistanceMeasure;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.exceptions.ConvergenceException;
+import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
+import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
+
+/**
+ * This class is partly based on the corresponding class from Apache Common Math lib.
+ */
+public abstract class BaseKMeansClusterer<T extends Matrix> implements Clusterer<T, KMeansModel> {
+    /** The distance measure to use. */
+    private DistanceMeasure measure;
+
+    /**
+     * Build a new clusterer with the given {@link DistanceMeasure}.
+     *
+     * @param measure the distance measure to use
+     */
+    protected BaseKMeansClusterer(final DistanceMeasure measure) {
+        this.measure = measure;
+    }
+
+    /**
+     * Perform a cluster analysis on the given set of points.
+     *
+     * @param points the set of points
+     * @return a {@link List} of clusters
+     * @throws MathIllegalArgumentException if points are null or the number of data points is not compatible with this
+     * clusterer
+     * @throws ConvergenceException if the algorithm has not yet converged after the maximum number of iterations has
+     * been exceeded
+     */
+    public abstract KMeansModel cluster(T points, int k)
+        throws MathIllegalArgumentException, ConvergenceException;
+
+    /**
+     * Returns the {@link DistanceMeasure} instance used by this clusterer.
+     *
+     * @return the distance measure
+     */
+    public DistanceMeasure getDistanceMeasure() {
+        return measure;
+    }
+
+    /**
+     * Calculates the distance between two vectors.
+     * with the configured {@link DistanceMeasure}.
+     *
+     * @return the distance between the two clusterables
+     */
+    protected double distance(final Vector v1, final Vector v2) {
+        return measure.compute(v1, v2);
+    }
+
+    /**
+     * Find the closest cluster center index and distance to it from a given point.
+     *
+     * @param centers Centers to look in.
+     * @param pnt Point.
+     */
+    protected IgniteBiTuple<Integer, Double> findClosest(Vector[] centers, Vector pnt) {
+        double bestDistance = Double.POSITIVE_INFINITY;
+        int bestInd = 0;
+
+        for (int i = 0; i < centers.length; i++) {
+            double dist = distance(centers[i], pnt);
+            if (dist < bestDistance) {
+                bestDistance = dist;
+                bestInd = i;
+            }
+        }
+
+        return new IgniteBiTuple<>(bestInd, bestDistance);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/Clusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/Clusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/Clusterer.java
new file mode 100644
index 0000000..f03dc95
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/Clusterer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import org.apache.ignite.ml.Model;
+
+/**
+ * Base interface for clusterers.
+ */
+public interface Clusterer<P, M extends Model> {
+    /** Cluster given points set into k clusters.
+     *
+     * @param points Points set.
+     * @param k Clusters count.
+     */
+    public M cluster(P points, int k);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/ClusterizationModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/ClusterizationModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/ClusterizationModel.java
new file mode 100644
index 0000000..99afec5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/ClusterizationModel.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import org.apache.ignite.ml.Model;
+
+/** Base interface for all clusterization models. */
+public interface ClusterizationModel<P, V> extends Model<P, V> {
+    /** Gets the clusters count. */
+    public int clustersCount();
+
+    /** Get cluster centers. */
+    public P[] centers();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
new file mode 100644
index 0000000..09317d6
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.*;
+import org.apache.ignite.ml.math.exceptions.ConvergenceException;
+import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
+import org.apache.ignite.ml.math.functions.Functions;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.impls.CacheUtils;
+import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
+import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
+import org.apache.ignite.ml.math.util.MapUtil;
+import org.apache.ignite.ml.math.util.MatrixUtil;
+
+import javax.cache.Cache;
+
+import static org.apache.ignite.ml.math.impls.CacheUtils.*;
+import static org.apache.ignite.ml.math.util.MatrixUtil.localCopyOf;
+
+/**
+ * Clustering algorithm based on Bahmani et al. paper and Apache Spark class with corresponding functionality.
+ *
+ * @see <a href="http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf">Scalable K-Means++(wikipedia)</a>
+ */
+public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistributedMatrix> {
+    /** */
+    private final int maxIterations;
+
+    /** */
+    private Random rnd;
+
+    /** */
+    private int initSteps;
+
+    /** */
+    private long seed;
+
+    /** */
+    private double epsilon = 1e-4;
+
+    /** */
+    public KMeansDistributedClusterer(DistanceMeasure measure, int initSteps, int maxIterations, Long seed) {
+        super(measure);
+        this.initSteps = initSteps;
+
+        this.seed = seed != null ? seed : new Random().nextLong();
+
+        this.maxIterations = maxIterations;
+        rnd = new Random(this.seed);
+    }
+
+    /** */
+    @Override public KMeansModel cluster(SparseDistributedMatrix points, int k) throws
+            MathIllegalArgumentException, ConvergenceException {
+        SparseDistributedMatrix pointsCp = (SparseDistributedMatrix) points.like(points.rowSize(), points.columnSize());
+
+        // TODO: this copy is very ineffective, just for POC. Immutability of data should be guaranteed by other methods
+        // such as logical locks for example.
+        pointsCp.assign(points);
+
+        Vector[] centers = initClusterCenters(pointsCp, k);
+
+        boolean converged = false;
+        int iteration = 0;
+        int dim = pointsCp.viewRow(0).size();
+        IgniteUuid uid = pointsCp.getUUID();
+
+        // Execute iterations of Lloyd's algorithm until converged
+        while (iteration < maxIterations && !converged) {
+            SumsAndCounts stats = getSumsAndCounts(centers, dim, uid);
+
+            converged = true;
+
+            for (Integer ind : stats.sums.keySet()) {
+                Vector massCenter = stats.sums.get(ind).times(1.0 / stats.counts.get(ind));
+
+                if (converged && distance(massCenter, centers[ind]) > epsilon * epsilon)
+                    converged = false;
+
+                centers[ind] = massCenter;
+            }
+
+            iteration++;
+        }
+
+        pointsCp.destroy();
+
+        return new KMeansModel(centers, getDistanceMeasure());
+    }
+
+    /** Initialize cluster centers. */
+    private Vector[] initClusterCenters(SparseDistributedMatrix points, int k) {
+        // Initialize empty centers and point costs.
+        int ptsCount = points.rowSize();
+
+        // Initialize the first center to a random point.
+        Vector sample = localCopyOf(points.viewRow(rnd.nextInt(ptsCount)));
+
+        List<Vector> centers = new ArrayList<>();
+        List<Vector> newCenters = new ArrayList<>();
+        newCenters.add(sample);
+        centers.add(sample);
+
+        final ConcurrentHashMap<Integer, Double> costs = new ConcurrentHashMap<>();
+
+        // On each step, sample 2 * k points on average with probability proportional
+        // to their squared distance from the centers. Note that only distances between points
+        // and new centers are computed in each iteration.
+        int step = 0;
+        IgniteUuid uid = points.getUUID();
+
+        while (step < initSteps) {
+            // We assume here that costs can fit into memory of one node.
+            ConcurrentHashMap<Integer, Double> newCosts = getNewCosts(points, newCenters);
+
+            // Merge costs with new costs.
+            for (Integer ind : newCosts.keySet())
+                costs.merge(ind, newCosts.get(ind), Math::min);
+
+            double sumCosts = costs.values().stream().mapToDouble(Double::valueOf).sum();
+
+            newCenters = getNewCenters(k, costs, uid, sumCosts);
+            centers.addAll(newCenters);
+
+            step++;
+        }
+
+        List<Vector> distinctCenters = centers.stream().distinct().collect(Collectors.toList());
+
+        if (distinctCenters.size() <= k)
+            return distinctCenters.toArray(new Vector[] {});
+        else {
+            // Finally, we might have a set of more than k distinct candidate centers; weight each
+            // candidate by the number of points in the dataset mapping to it and run a local k-means++
+            // on the weighted centers to pick k of them
+            ConcurrentHashMap<Integer, Integer> centerInd2Weight = weightCenters(uid, distinctCenters);
+
+            List<Double> weights = new ArrayList<>(centerInd2Weight.size());
+
+            for (int i = 0; i < distinctCenters.size(); i++)
+                weights.add(i, Double.valueOf(centerInd2Weight.getOrDefault(i, 0)));
+
+            DenseLocalOnHeapMatrix dCenters = MatrixUtil.fromList(distinctCenters, true);
+
+            return new KMeansLocalClusterer(getDistanceMeasure(), 30, seed).cluster(dCenters, k, weights).centers();
+        }
+    }
+
+    /** */
+    private List<Vector> getNewCenters(int k, ConcurrentHashMap<Integer, Double> costs, IgniteUuid uid, double sumCosts) {
+        return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
+            (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>,
+                List<Vector>,
+                List<Vector>>)(vectorWithIndex, list) -> {
+                Integer ind = vectorWithIndex.getKey().get1();
+
+                double prob = costs.get(ind) * 2.0 * k / sumCosts;
+
+                if (new Random(seed ^ ind).nextDouble() < prob)
+                    list.add(VectorUtils.fromMap(vectorWithIndex.getValue(), false));
+
+                return list;
+            },
+            key -> key.get2().equals(uid),
+            (list1, list2) -> {
+                list1.addAll(list2);
+                return list1;
+            },
+            new ArrayList<>()
+        );
+    }
+
+    /** */
+    private ConcurrentHashMap<Integer, Double> getNewCosts(SparseDistributedMatrix points, List<Vector> newCenters) {
+        return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
+            (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, ConcurrentHashMap<Integer, Double>>,
+                ConcurrentHashMap<Integer, Double>,
+                ConcurrentHashMap<Integer, Double>>)(vectorWithIndex, map) -> {
+                for (Vector center : newCenters)
+                    map.merge(vectorWithIndex.getKey().get1(), distance(vectorWithIndex.getValue(), center), Functions.MIN);
+
+                return map;
+            },
+            key -> key.get2().equals(points.getUUID()),
+            (map1, map2) -> {
+                map1.putAll(map2);
+                return map1;
+            }, new ConcurrentHashMap<>());
+    }
+
+    /** */
+    private ConcurrentHashMap<Integer, Integer> weightCenters(IgniteUuid uid, List<Vector> distinctCenters) {
+        return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
+            (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>,
+                ConcurrentHashMap<Integer, Integer>,
+                ConcurrentHashMap<Integer, Integer>>)(vectorWithIndex, countMap) -> {
+                Integer resInd = -1;
+                Double resDist = Double.POSITIVE_INFINITY;
+
+                int i = 0;
+                for (Vector cent : distinctCenters) {
+                    double curDist = distance(vectorWithIndex.getValue(), cent);
+
+                    if (resDist > curDist) {
+                        resDist = curDist;
+                        resInd = i;
+                    }
+
+                    i++;
+                }
+
+                countMap.compute(resInd, (ind, v) -> v != null ? v + 1 : 1);
+                return countMap;
+            },
+            key -> key.get2().equals(uid),
+            (map1, map2) -> MapUtil.mergeMaps(map1, map2, (integer, integer2) -> integer2 + integer,
+                ConcurrentHashMap::new),
+            new ConcurrentHashMap<>());
+    }
+
+    /** */
+    private double distance(Map<Integer, Double> vecMap, Vector vector) {
+        return distance(VectorUtils.fromMap(vecMap, false), vector);
+    }
+
+    /** */
+    private SumsAndCounts getSumsAndCounts(Vector[] centers, int dim, IgniteUuid uid) {
+        return CacheUtils.distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
+                (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>, SumsAndCounts, SumsAndCounts>)(entry, counts) -> {
+                    Map<Integer, Double> vec = entry.getValue();
+
+                    IgniteBiTuple<Integer, Double> closest = findClosest(centers, VectorUtils.fromMap(vec, false));
+                    int bestCenterIdx = closest.get1();
+
+                    counts.totalCost += closest.get2();
+                    counts.sums.putIfAbsent(bestCenterIdx, VectorUtils.zeroes(dim));
+
+                    counts.sums.compute(bestCenterIdx,
+                            (IgniteBiFunction<Integer, Vector, Vector>)(ind, v) -> v.plus(VectorUtils.fromMap(vec, false)));
+
+                    counts.counts.merge(bestCenterIdx, 1,
+                            (IgniteBiFunction<Integer, Integer, Integer>)(i1, i2) -> i1 + i2);
+
+                    return counts;
+                },
+                key -> key.get2().equals(uid),
+                SumsAndCounts::merge, new SumsAndCounts()
+        );
+    }
+
+    /** Service class used for statistics. */
+    private static class SumsAndCounts {
+        /** */
+        public double totalCost;
+
+        /** */
+        public ConcurrentHashMap<Integer, Vector> sums = new ConcurrentHashMap<>();
+
+        /** Count of points closest to the center with a given index. */
+        public ConcurrentHashMap<Integer, Integer> counts = new ConcurrentHashMap<>();
+
+        /** Merge current */
+        public SumsAndCounts merge(SumsAndCounts other) {
+            this.totalCost += totalCost;
+            MapUtil.mergeMaps(sums, other.sums, Vector::plus, ConcurrentHashMap::new);
+            MapUtil.mergeMaps(counts, other.counts, (i1, i2) -> i1 + i2, ConcurrentHashMap::new);
+            return this;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
new file mode 100644
index 0000000..c98b818
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.ignite.internal.util.GridArgumentCheck;
+import org.apache.ignite.ml.math.*;
+import org.apache.ignite.ml.math.exceptions.ConvergenceException;
+import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
+import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
+
+import static org.apache.ignite.ml.math.util.MatrixUtil.localCopyOf;
+
+/**
+ * Perform clusterization on local data.
+ * This class is based on Apache Spark class with corresponding functionality.
+ */
+public class KMeansLocalClusterer extends BaseKMeansClusterer<DenseLocalOnHeapMatrix> implements
+    WeightedClusterer<DenseLocalOnHeapMatrix, KMeansModel> {
+    /** */
+    private int maxIterations;
+
+    /** */
+    private Random rand;
+
+    /**
+     * Build a new clusterer with the given {@link DistanceMeasure}.
+     *
+     * @param measure Distance measure to use.
+     * @param maxIterations maximal number of iterations.
+     * @param seed Seed used in random parts of algorithm.
+     */
+    public KMeansLocalClusterer(DistanceMeasure measure, int maxIterations, Long seed) {
+        super(measure);
+        this.maxIterations = maxIterations;
+        rand = seed != null ? new Random(seed) : new Random();
+    }
+
+    /** {@inheritDoc} */
+    @Override public KMeansModel cluster(
+        DenseLocalOnHeapMatrix points, int k) throws MathIllegalArgumentException, ConvergenceException {
+        List<Double> ones = new ArrayList<>(Collections.nCopies(points.rowSize(), 1.0));
+        return cluster(points, k, ones);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KMeansModel cluster(DenseLocalOnHeapMatrix points, int k,
+        List<Double> weights) throws MathIllegalArgumentException, ConvergenceException {
+
+        GridArgumentCheck.notNull(points, "points");
+
+        int dim = points.columnSize();
+        Vector[] centers = new Vector[k];
+
+        centers[0] = pickWeighted(points, weights);
+
+        Vector costs = points.foldRows(row -> distance(row,
+            centers[0]));
+
+        for (int i = 0; i < k; i++) {
+            double weightedSum = weightedSum(costs, weights);
+
+            double r = rand.nextDouble() * weightedSum;
+            double s = 0.0;
+            int j = 0;
+
+            while (j < points.rowSize() && s < r) {
+                s += weights.get(j) * costs.get(j);
+                j++;
+            }
+
+            if (j == 0)
+                // TODO: Process this case more carefully
+                centers[i] = localCopyOf(points.viewRow(0));
+            else
+                centers[i] = localCopyOf(points.viewRow(j - 1));
+
+            for (int p = 0; p < points.rowSize(); p++)
+                costs.setX(p, Math.min(getDistanceMeasure().compute(localCopyOf(points.viewRow(p)), centers[i]),
+                        costs.get(p)));
+        }
+
+        int[] oldClosest = new int[points.rowSize()];
+        Arrays.fill(oldClosest, -1);
+        int iter = 0;
+        boolean moved = true;
+
+        while (moved && iter < maxIterations) {
+            moved = false;
+
+            double[] counts = new double[k];
+            Arrays.fill(counts, 0.0);
+            Vector[] sums = new Vector[k];
+
+            Arrays.fill(sums, VectorUtils.zeroes(dim));
+
+            int i = 0;
+
+            while (i < points.rowSize()) {
+                Vector p = localCopyOf(points.viewRow(i));
+
+                int ind = findClosest(centers, p).get1();
+                sums[ind] = sums[ind].plus(p.times(weights.get(i)));
+
+                counts[ind] += weights.get(i);
+                if (ind != oldClosest[i]) {
+                    moved = true;
+                    oldClosest[i] = ind;
+                }
+                i++;
+            }
+            // Update centers
+            int j = 0;
+            while (j < k) {
+                if (counts[j] == 0.0) {
+                    // Assign center to a random point
+                    centers[j] = points.viewRow(rand.nextInt(points.rowSize()));
+                } else {
+                    sums[j] = sums[j].times(1.0 / counts[j]);
+                    centers[j] = sums[j];
+                }
+                j++;
+            }
+            iter++;
+        }
+
+        return new KMeansModel(centers, getDistanceMeasure());
+    }
+
+    /** Pick a random vector with a probability proportional to the corresponding weight. */
+    private Vector pickWeighted(Matrix points, List<Double> weights) {
+        double r = rand.nextDouble() * weights.stream().mapToDouble(Double::valueOf).sum();
+
+        int i = 0;
+        double curWeight = 0.0;
+
+        while (i < points.rowSize() && curWeight < r) {
+            curWeight += weights.get(i);
+            i += 1;
+        }
+
+        return localCopyOf(points.viewRow(i - 1));
+    }
+
+    /** Get a weighted sum of a vector v. */
+    private double weightedSum(Vector v, List<Double> weights) {
+        double res = 0.0;
+
+        for (int i = 0; i < v.size(); i++)
+            res += v.getX(i) * weights.get(i);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
new file mode 100644
index 0000000..6584273
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.util.Arrays;
+import org.apache.ignite.ml.math.DistanceMeasure;
+import org.apache.ignite.ml.math.Vector;
+
+/**
+ * This class encapsulates result of clusterization.
+ */
+public class KMeansModel implements ClusterizationModel<Vector, Integer> {
+    /** Centers of clusters. */
+    private Vector[] centers;
+
+    /** Distance measure. */
+    private DistanceMeasure distance;
+
+    /**
+     * Construct KMeans model with given centers and distance measure.
+     *
+     * @param centers Centers.
+     * @param distance Distance measure.
+     */
+    KMeansModel(Vector[] centers, DistanceMeasure distance) {
+        this.centers = centers;
+        this.distance = distance;
+    }
+
+    /** Distance measure used while clusterization */
+    public DistanceMeasure distanceMeasure() {
+        return distance;
+    }
+
+    /** Count of centers in clusterization. */
+    @Override public int clustersCount() {
+        return centers.length;
+    }
+
+    /** Get centers of clusters. */
+    @Override public Vector[] centers() {
+        return Arrays.copyOf(centers, centers.length);
+    }
+
+    /**
+     * Predict closest center index for a given vector.
+     *
+     * @param vec Vector.
+     */
+    public Integer predict(Vector vec) {
+        int res = -1;
+        double minDist = Double.POSITIVE_INFINITY;
+
+        for (int i = 0; i < centers.length; i++) {
+            double curDist = distance.compute(centers[i], vec);
+            if (curDist < minDist) {
+                minDist = curDist;
+                res = i;
+            }
+        }
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
new file mode 100644
index 0000000..55fb359
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.util.List;
+import org.apache.ignite.ml.Model;
+import org.apache.ignite.ml.math.exceptions.ConvergenceException;
+import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
+
+/**
+ * Support of clusterization with given weights.
+ */
+public interface WeightedClusterer<P, M extends Model> extends Clusterer<P, M> {
+    /**
+     * Perform clusterization of given points weighted by given weights.
+     *
+     * @param points Points.
+     * @param k count of centers.
+     * @param weights Weights.
+     */
+    public KMeansModel cluster(P points, int k, List<Double> weights) throws
+        MathIllegalArgumentException, ConvergenceException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/clustering/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/package-info.java
new file mode 100644
index 0000000..e83084a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains clustering algorithms.
+ */
+package org.apache.ignite.ml.clustering;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
new file mode 100644
index 0000000..0fd74ac
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math;
+
+import java.io.Externalizable;
+
+import org.apache.ignite.ml.math.exceptions.CardinalityException;
+
+/**
+ * This class is based on the corresponding class from Apache Common Math lib.
+ * Interface for distance measures of n-dimensional vectors.
+ */
+public interface DistanceMeasure extends Externalizable {
+    /**
+     * Compute the distance between two n-dimensional vectors.
+     * <p>
+     * The two vectors are required to have the same dimension.
+     *
+     * @param a the first vector
+     * @param b the second vector
+     * @return the distance between the two vectors
+     * @throws CardinalityException if the array lengths differ.
+     */
+    double compute(Vector a, Vector b) throws CardinalityException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
new file mode 100644
index 0000000..b748ac5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.ml.math.exceptions.CardinalityException;
+import org.apache.ignite.ml.math.util.MatrixUtil;
+
+/**
+ * Calculates the L<sub>2</sub> (Euclidean) distance between two points.
+ */
+public class EuclideanDistance implements DistanceMeasure {
+    /** Serializable version identifier. */
+    private static final long serialVersionUID = 1717556319784040040L;
+
+    /** {@inheritDoc} */
+    @Override
+    public double compute(Vector a, Vector b)
+    throws CardinalityException {
+        return MatrixUtil.localCopyOf(a).minus(b).kNorm(2.0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        // No-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/MathUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/MathUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/MathUtils.java
new file mode 100644
index 0000000..c2164ab
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/MathUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math;
+
+import org.apache.ignite.ml.math.exceptions.NullArgumentException;
+
+/**
+ * Miscellaneous utility functions.
+ */
+public final class MathUtils {
+    public static void checkNotNull(Object o)
+        throws NullArgumentException {
+        if (o == null)
+            throw new NullArgumentException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/Matrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/Matrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/Matrix.java
index 2cf4e63..db822e6 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/Matrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/Matrix.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.ml.math;
 
 import java.io.Externalizable;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.exceptions.CardinalityException;
 import org.apache.ignite.ml.math.exceptions.IndexException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/StorageConstants.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/StorageConstants.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/StorageConstants.java
index ec2ee65..38d42ba 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/StorageConstants.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/StorageConstants.java
@@ -33,6 +33,9 @@ public interface StorageConstants {
     /** Storage mode optimized for column access. */
     public static final int COLUMN_STORAGE_MODE = 2002;
 
+    /** Storage mode is unknown. */
+    public static final int UNKNOWN_STORAGE_MODE = 3001;
+
     /**
      * @param mode Access mode to verify.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
new file mode 100644
index 0000000..f41a5fe
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math;
+
+import java.util.Map;
+import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.apache.ignite.ml.math.impls.vector.MapWrapperVector;
+import org.apache.ignite.ml.math.impls.vector.SparseLocalVector;
+
+public class VectorUtils {
+    /** Create new vector like given vector initialized by zeroes. */
+    public static Vector zeroesLike(Vector v) {
+        return v.like(v.size()).assign(0.0);
+    }
+
+    /** Create new */
+    public static DenseLocalOnHeapVector zeroes(int n) {
+        return (DenseLocalOnHeapVector) new DenseLocalOnHeapVector(n).assign(0.0);
+    }
+
+    /** */
+    public static Vector fromMap(Map<Integer, Double> value, boolean copy) {
+        return new MapWrapperVector(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/ConvergenceException.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/ConvergenceException.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/ConvergenceException.java
new file mode 100644
index 0000000..2cf0bcf
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/ConvergenceException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math.exceptions;
+
+/**
+ * This class is based on the corresponding class from Apache Common Math lib.
+ * Error thrown when a numerical computation can not be performed because the
+ * numerical result failed to converge to a finite value.
+ */
+public class ConvergenceException extends MathIllegalStateException {
+    /** Serializable version Id. */
+    private static final long serialVersionUID = 4330003017885151975L;
+
+    /** */
+    private static final String CONVERGENCE_FAILED = "convergence failed";
+
+    /**
+     * Construct the exception.
+     */
+    public ConvergenceException() {
+        this(CONVERGENCE_FAILED);
+    }
+
+    /**
+     * Construct the exception with a specific context and arguments.
+     *
+     * @param msg Message pattern providing the specific context of
+     * the error.
+     * @param args Arguments.
+     */
+    public ConvergenceException(String msg, Object ... args) {
+        super(msg, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathArithmeticException.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathArithmeticException.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathArithmeticException.java
index f48f3c5..ccd019c 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathArithmeticException.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathArithmeticException.java
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.ml.math.exceptions;
 
 /**
  * This class is based on the corresponding class from Apache Common Math lib.
  * Base class for arithmetic exceptions.
- * It is used for all the exceptions that have the semantics of the standard
- * {@link ArithmeticException}, but must also provide a localized
- * message.
  */
 public class MathArithmeticException extends MathRuntimeException {
     /** Serializable version Id. */
@@ -31,7 +29,7 @@ public class MathArithmeticException extends MathRuntimeException {
      * Default constructor.
      */
     public MathArithmeticException() {
-        this("arithmetic exception");
+        this("Arithmetic exception.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalNumberException.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalNumberException.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalNumberException.java
new file mode 100644
index 0000000..2e7280b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalNumberException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math.exceptions;
+
+/**
+ * This class is based on the corresponding class from Apache Common Math lib.
+ * Base class for exceptions raised by a wrong number.
+ * This class is not intended to be instantiated directly: it should serve
+ * as a base class to create all the exceptions that are raised because some
+ * precondition is violated by a number argument.
+ */
+public class MathIllegalNumberException extends MathIllegalArgumentException {
+    /** Serializable version Id. */
+    private static final long serialVersionUID = -7447085893598031110L;
+
+    /** Requested. */
+    private final Number argument;
+
+    /**
+     * Construct an exception.
+     *
+     * @param msg Localizable pattern.
+     * @param wrong Wrong number.
+     * @param arguments Arguments.
+     */
+    protected MathIllegalNumberException(String msg, Number wrong, Object... arguments) {
+        super(msg, wrong, arguments);
+        argument = wrong;
+    }
+
+    /**
+     * @return the requested value.
+     */
+    public Number getArgument() {
+        return argument;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalStateException.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalStateException.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalStateException.java
new file mode 100644
index 0000000..13ef5ca
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/MathIllegalStateException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math.exceptions;
+
+/**
+ * This class is based on the corresponding class from Apache Common Math lib.
+ * Base class for all exceptions that signal that the process
+ * throwing the exception is in a state that does not comply with
+ * the set of states that it is designed to be in.
+ */
+public class MathIllegalStateException extends MathRuntimeException {
+    /** Serializable version Id. */
+    private static final long serialVersionUID = -6024911025449780478L;
+
+    /** */
+    private static final String ILLEGAL_STATE= "Illegal state.";
+
+    /**
+     * Simple constructor.
+     *
+     * @param msg Message pattern explaining the cause of the error.
+     * @param args Arguments.
+     */
+    public MathIllegalStateException(String msg, Object ... args) {
+        super(msg, args);
+    }
+
+    /**
+     * Default constructor.
+     */
+    public MathIllegalStateException() {
+        this(ILLEGAL_STATE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/NumberIsTooSmallException.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/NumberIsTooSmallException.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/NumberIsTooSmallException.java
new file mode 100644
index 0000000..7427592
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/exceptions/NumberIsTooSmallException.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.math.exceptions;
+
+/**
+ * Exception to be thrown when a number is too small.
+ */
+public class NumberIsTooSmallException extends MathIllegalNumberException {
+    /** */
+    private static final String NUMBER_TOO_SMALL = "Number {1} is smaller than the minimum ({2}).";
+
+    /** */
+    private static final String NUMBER_TOO_SMALL_BOUND_EXCLUDED = "Number {1} is smaller than, or equal to, the minimum ({2}).";
+
+    /** Serializable version Id. */
+    private static final long serialVersionUID = -6100997100383932834L;
+    /**
+     * Higher bound.
+     */
+    private final Number min;
+    /**
+     * Whether the maximum is included in the allowed range.
+     */
+    private final boolean boundIsAllowed;
+
+    /**
+     * Construct the exception.
+     *
+     * @param wrong Value that is smaller than the minimum.
+     * @param min Minimum.
+     * @param boundIsAllowed Whether {@code min} is included in the allowed range.
+     */
+    public NumberIsTooSmallException(Number wrong, Number min, boolean boundIsAllowed) {
+        this(boundIsAllowed ? NUMBER_TOO_SMALL : NUMBER_TOO_SMALL_BOUND_EXCLUDED,
+            wrong, min, boundIsAllowed);
+    }
+
+    /**
+     * Construct the exception with a specific context.
+     *
+     * @param msg Specific context pattern.
+     * @param wrong Value that is smaller than the minimum.
+     * @param min Minimum.
+     * @param boundIsAllowed Whether {@code min} is included in the allowed range.
+     */
+    public NumberIsTooSmallException(String msg, Number wrong, Number min, boolean boundIsAllowed) {
+        super(msg, wrong, min);
+        this.min = min;
+        this.boundIsAllowed = boundIsAllowed;
+    }
+
+    /**
+     * @return {@code true} if the minimum is included in the allowed range.
+     */
+    public boolean getBoundIsAllowed() {
+        return boundIsAllowed;
+    }
+
+    /**
+     * @return the minimum.
+     */
+    public Number getMin() {
+        return min;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
index e86a5eb..22a453d 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.ml.math.functions;
 
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import java.util.List;
+
 /**
  * Compatibility with Apache Mahout.
  */
@@ -63,6 +67,9 @@ public final class Functions {
     /** Function that returns {@code a - b}. */
     public static final IgniteBiFunction<Double, Double, Double> MINUS = (a, b) -> a - b;
 
+    /** Function that returns {@code min(a, b)}. */
+    public static final IgniteBiFunction<Double, Double, Double> MIN = Math::min;
+
     /** Function that returns {@code abs(a - b)}. */
     public static final IgniteBiFunction<Double, Double, Double> MINUS_ABS = (a, b) -> Math.abs(a - b);
 
@@ -81,6 +88,38 @@ public final class Functions {
     /** Function that returns {@code a &lt; b ? -1 : a &gt; b ? 1 : 0}. */
     public static final IgniteBiFunction<Double, Double, Double> COMPARE = (a, b) -> a < b ? -1.0 : a > b ? 1.0 : 0.0;
 
+    /** */
+    public  static <A, B, C> IgniteFunction<B, C> curry(IgniteBiFunction<A, B, C> f, A a) {
+        return (IgniteFunction<B, C>)b -> f.apply(a, b);
+    }
+
+    /** */
+    public static <A, B extends Comparable<B>> IgniteBiTuple<Integer, A> argmin(List<A> args, IgniteFunction<A, B> f) {
+        A res = null;
+        B fRes = null;
+
+        if (!args.isEmpty()) {
+            res = args.iterator().next();
+            fRes = f.apply(res);
+        }
+
+        int resInd = 0;
+        int i = 0;
+
+        for (A arg : args) {
+            B curRes = f.apply(arg);
+
+            if (fRes.compareTo(curRes) > 0) {
+                res = arg;
+                resInd = i;
+                fRes = curRes;
+            }
+
+            i++;
+        }
+        return new IgniteBiTuple<>(resInd, res);
+    }
+
     /**
      * Function that returns {@code a + b}. {@code a} is a variable, {@code b} is fixed.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
index ace399b..836789b 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.ml.math.impls;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.BinaryOperator;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -28,6 +29,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -38,6 +40,7 @@ import org.apache.ignite.ml.math.ValueMapper;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.math.functions.IgniteConsumer;
 import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
 import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
 
 /**
@@ -380,6 +383,50 @@ public class CacheUtils {
         });
     }
 
+    public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
+        IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal) {
+        return sparseFold(cacheName, folder, keyFilter, accumulator, zeroVal, null, null, 0 ,
+            false);
+    }
+
+    private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
+        IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal, V defVal, K defKey, long defValCnt, boolean isNilpotent) {
+
+        A defRes = zeroVal;
+
+        if (!isNilpotent)
+            for (int i = 0; i < defValCnt; i++)
+                defRes = folder.apply(new CacheEntryImpl<>(defKey, defVal), defRes);
+
+        Collection<A> totalRes = bcast(cacheName, () -> {
+            Ignite ignite = Ignition.localIgnite();
+            IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
+
+            int partsCnt = ignite.affinity(cacheName).partitions();
+
+            // Use affinity in filter for ScanQuery. Otherwise we accept consumer in each node which is wrong.
+            Affinity affinity = ignite.affinity(cacheName);
+            ClusterNode localNode = ignite.cluster().localNode();
+
+            A a = zeroVal;
+
+            // Iterate over all partitions. Some of them will be stored on that local node.
+            for (int part = 0; part < partsCnt; part++) {
+                int p = part;
+
+                // Iterate over given partition.
+                // Query returns an empty cursor if this partition is not stored on this node.
+                for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
+                    (k, v) -> affinity.mapPartitionToNode(p) == localNode  && (keyFilter == null || keyFilter.apply(k)))))
+                    a = folder.apply(entry, a);
+            }
+
+            return a;
+        });
+        totalRes.add(defRes);
+        return totalRes.stream().reduce(zeroVal, accumulator);
+    }
+
     /**
      * @param cacheName Cache name.
      * @param run {@link Runnable} to broadcast to cache nodes for given cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index d1d3904..106a425 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.MatrixStorage;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index 3e508bd..cebacc5 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -156,7 +156,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
     }
 
     /** */
-    private IgniteUuid getUUID(){
+    public IgniteUuid getUUID(){
         return ((SparseDistributedMatrixStorage) getStorage()).getUUID();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
new file mode 100644
index 0000000..13b8303
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
@@ -0,0 +1,93 @@
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import org.apache.ignite.internal.util.GridArgumentCheck;
+import org.apache.ignite.ml.math.VectorStorage;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Storage for wrapping given map.
+ */
+public class MapWrapperStorage implements VectorStorage {
+    /** Underlying map. */
+    Map<Integer, Double> data;
+
+    /** Vector size. */
+    int size;
+
+    /**
+     * Construct a wrapper around given map.
+     *
+     * @param map Map to wrap.
+     */
+    public MapWrapperStorage(Map<Integer, Double> map) {
+        Set<Integer> keys = map.keySet();
+
+        GridArgumentCheck.notEmpty(keys, "map");
+
+        Integer min = keys.stream().mapToInt(Integer::valueOf).min().getAsInt();
+        Integer max = keys.stream().mapToInt(Integer::valueOf).max().getAsInt();
+
+        assert min >= 0;
+
+        data = map;
+        size = (max - min) + 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double get(int i) {
+        return data.getOrDefault(i, 0.0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void set(int i, double v) {
+        if (v != 0.0)
+            data.put(i, v);
+        else if (data.containsKey(i))
+            data.remove(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        data = (Map<Integer, Double>) in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isSequentialAccess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRandomAccess() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDense() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isArrayBased() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDistributed() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
index 8400758..f07a16e 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.Int2DoubleRBTreeMap;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.ml.math.StorageConstants;
 import org.apache.ignite.ml.math.VectorStorage;
@@ -46,6 +47,37 @@ public class SparseLocalOnHeapVectorStorage implements VectorStorage, StorageCon
     }
 
     /**
+     *
+     * @param map
+     */
+    public SparseLocalOnHeapVectorStorage(Map<Integer, Double> map, boolean copy) {
+        assert map.size() > 0;
+
+        this.size = map.size();
+
+        if (map instanceof Int2DoubleRBTreeMap)
+            acsMode = SEQUENTIAL_ACCESS_MODE;
+        else
+            if (map instanceof Int2DoubleOpenHashMap)
+                acsMode = RANDOM_ACCESS_MODE;
+            else
+                acsMode = UNKNOWN_STORAGE_MODE;
+
+        if (copy)
+            switch (acsMode) {
+                case SEQUENTIAL_ACCESS_MODE:
+                    sto = new Int2DoubleRBTreeMap(map);
+                case RANDOM_ACCESS_MODE:
+                    sto = new Int2DoubleOpenHashMap(map);
+                    break;
+                default:
+                    sto = new HashMap<>(map);
+            }
+        else
+            sto = map;
+    }
+
+    /**
      * @param size Vector size.
      * @param acsMode Access mode.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/DelegatingVector.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/DelegatingVector.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/DelegatingVector.java
index c868160..48fbd06 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/DelegatingVector.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/DelegatingVector.java
@@ -59,6 +59,11 @@ public class DelegatingVector implements Vector {
         this.dlg = dlg;
     }
 
+    /** Get the delegating vector */
+    public Vector getVector() {
+        return dlg;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(dlg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
new file mode 100644
index 0000000..729a7ea
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
@@ -0,0 +1,32 @@
+package org.apache.ignite.ml.math.impls.vector;
+
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.impls.storage.matrix.MapWrapperStorage;
+
+import java.util.Map;
+
+/**
+ * Vector wrapping a given map.
+ */
+public class MapWrapperVector extends AbstractVector {
+
+    /**
+     * Construct a vector wrapping given map.
+     *
+     * @param map Map to wrap.
+     */
+    public MapWrapperVector(Map<Integer, Double> map) {
+        setStorage(new MapWrapperStorage(map));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Vector like(int crd) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Matrix likeMatrix(int rows, int cols) {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/SparseLocalVector.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/SparseLocalVector.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/SparseLocalVector.java
index e188f70..be5d0f6 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/SparseLocalVector.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/SparseLocalVector.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.ml.math.impls.vector;
 
+import java.util.Map;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.StorageConstants;
 import org.apache.ignite.ml.math.Vector;
@@ -35,6 +36,14 @@ public class SparseLocalVector extends AbstractVector implements StorageConstant
     }
 
     /**
+     * @param map Underlying map.
+     * @param copy Should given map be copied.
+     */
+    public SparseLocalVector(Map<Integer, Double> map, boolean copy) {
+        setStorage(new SparseLocalOnHeapVectorStorage(map, copy));
+    }
+
+    /**
      * @param size Vector size.
      * @param acsMode Vector elements access mode.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/VectorView.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/VectorView.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/VectorView.java
index f3bd4dd..0988059 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/VectorView.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/VectorView.java
@@ -60,6 +60,7 @@ public class VectorView extends AbstractVector {
 
     /** {@inheritDoc} */
     @Override public Vector copy() {
+        // TODO: revise this
         DelegateVectorStorage sto = storage();
 
         return new VectorView(sto.delegate(), sto.offset(), sto.length());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
new file mode 100644
index 0000000..e406b5b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.statistics;
+
+/**
+ * This class encapsulates calculating variance.
+ */
+public class Variance {
+    /** */
+    private double mean;
+
+    /** */
+    private long n;
+
+    /** */
+    private double m2;
+
+    public Variance() {
+        mean = 0;
+        n = 0;
+        m2 = 0;
+    }
+
+    /** */
+    public Variance update(Double x) {
+        n++;
+        double delta = x - mean;
+        mean += delta / n;
+        double delta2 = x - mean;
+        m2 += delta * delta2;
+        return this;
+    }
+
+    /** */
+    public double getResult() {
+        return m2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
new file mode 100644
index 0000000..6c25f0e
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.util;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ *
+ */
+public class MapUtil {
+    /** */
+    public static <K, V, M extends Map<K, V>> M mergeMaps(M m1, M m2, BinaryOperator<V> op, Supplier<M> mapSupplier) {
+        return Stream.of(m1, m2)
+            .map(Map::entrySet)
+            .flatMap(Collection::stream)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, op, mapSupplier));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MatrixUtil.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MatrixUtil.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MatrixUtil.java
index a06b773..5ef7176 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MatrixUtil.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MatrixUtil.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.ml.math.util;
 
+import java.util.List;
+
+import org.apache.ignite.internal.util.GridArgumentCheck;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.Vector;
 import org.apache.ignite.ml.math.impls.matrix.CacheMatrix;
@@ -24,7 +27,6 @@ import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
 import org.apache.ignite.ml.math.impls.matrix.MatrixView;
 import org.apache.ignite.ml.math.impls.matrix.PivotedMatrixView;
 import org.apache.ignite.ml.math.impls.matrix.RandomMatrix;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
 import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
 
 /**
@@ -117,6 +119,38 @@ public class MatrixUtil {
     /** */
     private static boolean isCopyLikeSupport(Matrix matrix) {
         return matrix instanceof RandomMatrix || matrix instanceof MatrixView || matrix instanceof CacheMatrix ||
-            matrix instanceof PivotedMatrixView || matrix instanceof SparseDistributedMatrix;
+            matrix instanceof PivotedMatrixView;
+    }
+
+    /** */
+    public static DenseLocalOnHeapMatrix fromList(List<Vector> vecs, boolean entriesAreRows) {
+        GridArgumentCheck.notEmpty(vecs, "vecs");
+
+        int dim = vecs.get(0).size();
+        int vecsSize = vecs.size();
+
+        DenseLocalOnHeapMatrix res = new DenseLocalOnHeapMatrix(entriesAreRows ? vecsSize : dim,
+            entriesAreRows ? dim : vecsSize);
+
+        for (int i = 0; i < vecsSize; i++) {
+            for (int j = 0; j < dim; j++) {
+                int r = entriesAreRows ? i : j;
+                int c = entriesAreRows ? j : i;
+
+                res.setX(r, c, vecs.get(i).get(j));
+            }
+        }
+
+        return res;
+    }
+
+    /** TODO: rewrite in a more optimal way. */
+    public static DenseLocalOnHeapVector localCopyOf(Vector vec) {
+        DenseLocalOnHeapVector res = new DenseLocalOnHeapVector(vec.size());
+
+        for (int i = 0; i < vec.size(); i++)
+            res.setX(i, vec.getX(i));
+
+        return res;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java
index 92aa7db..dea3edf 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/IgniteMLTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.ml;
 
+import org.apache.ignite.ml.clustering.ClusteringTesetSuite;
 import org.apache.ignite.ml.math.MathImplMainTestSuite;
 import org.apache.ignite.ml.regressions.RegressionsTestSuite;
 import org.junit.runner.RunWith;
@@ -28,7 +29,8 @@ import org.junit.runners.Suite;
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     MathImplMainTestSuite.class,
-    RegressionsTestSuite.class
+    RegressionsTestSuite.class,
+    ClusteringTesetSuite.class
 })
 public class IgniteMLTestSuite {
     // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b04b5800/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTesetSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTesetSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTesetSuite.java
new file mode 100644
index 0000000..122d0c1
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTesetSuite.java
@@ -0,0 +1,15 @@
+package org.apache.ignite.ml.clustering;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Test suite for all tests located in org.apache.ignite.ml.clustering package.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        KMeansDistributedClustererTest.class,
+        KMeansLocalClustererTest.class
+})
+public class ClusteringTesetSuite {
+}