You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/05/07 16:07:29 UTC

[2/2] flink git commit: [FLINK-1933] [ml] Adds distance metrics and documentation to machine learning library

[FLINK-1933] [ml] Adds distance metrics and documentation to machine learning library

[ml] [docs] Add a overview documentation for Distance Metrics

[FLINK-1933] [ml] Moves metrics.distances package to org.apache.flink.ml

This closes #629.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddb2b347
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddb2b347
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddb2b347

Branch: refs/heads/master
Commit: ddb2b34704a410491d428dec2ade0cb20a58ed80
Parents: 2f08d9d
Author: Chiwan Park <ch...@icloud.com>
Authored: Fri Apr 24 12:50:28 2015 +0900
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 7 16:06:56 2015 +0200

----------------------------------------------------------------------
 docs/libs/ml/distance_metrics.md                | 56 ++++++++++++++
 docs/libs/ml/index.md                           |  6 +-
 .../org/apache/flink/ml/math/DenseVector.scala  | 23 ++++++
 .../org/apache/flink/ml/math/SparseVector.scala | 38 +++++++++-
 .../scala/org/apache/flink/ml/math/Vector.scala | 13 ++++
 .../distances/ChebyshevDistanceMetric.scala     | 37 ++++++++++
 .../distances/CosineDistanceMetric.scala        | 45 ++++++++++++
 .../ml/metrics/distances/DistanceMetric.scala   | 37 ++++++++++
 .../distances/EuclideanDistanceMetric.scala     | 41 +++++++++++
 .../distances/ManhattanDistanceMetric.scala     | 37 ++++++++++
 .../distances/MinkowskiDistanceMetric.scala     | 41 +++++++++++
 .../SquaredEuclideanDistanceMetric.scala        | 37 ++++++++++
 .../distances/TanimotoDistanceMetric.scala      | 40 ++++++++++
 .../apache/flink/ml/math/DenseVectorSuite.scala | 36 +++++++++
 .../flink/ml/math/SparseVectorSuite.scala       | 36 +++++++++
 .../metrics/distances/DistanceMetricSuite.scala | 77 ++++++++++++++++++++
 16 files changed, 598 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/docs/libs/ml/distance_metrics.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/distance_metrics.md b/docs/libs/ml/distance_metrics.md
new file mode 100644
index 0000000..9d87fa1
--- /dev/null
+++ b/docs/libs/ml/distance_metrics.md
@@ -0,0 +1,56 @@
+---
+title: Distance Metrics
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+Different metrics of distance are convenient for different types of analysis. Flink Machine
+Learning Library provides built-in implementations for many standard distance metric. You can also
+use other distance metric by implementing `DistanceMetric` trait.
+
+## Built-in Implementations
+
+* Euclidean Distance
+* Squared Euclidean Distance
+* Cosine Distance
+* Tanimoto Distance
+* Chebyshev Distance
+* Manhattan Distance
+* Minkowski Distance
+
+## Custom Implementation
+
+You can use your own distance metric by implementing `DistanceMetric` trait.
+
+{% highlight scala %}
+class MyDistance extends DistanceMetric {
+  override def distance(a: Vector, b: Vector) = ... // your implementation for distance metric
+}
+
+object MyDistance {
+  def apply() = new MyDistance()
+}
+
+val metric = MyDistance()
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index 9753e68..0754045 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -36,4 +36,8 @@ under the License.
 * [Communication efficient distributed dual coordinate ascent (CoCoA)](cocoa.html)
 * [Multiple linear regression](multiple_linear_regression.html)
 * [Polynomial Base Feature Mapper](polynomial_base_feature_mapper.html)
-* [Standard Scaler](standard_scaler.html)
\ No newline at end of file
+* [Standard Scaler](standard_scaler.html)
+
+## Metrics
+
+* [Distance Metrics](distance_metrics.html)

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
index e5c6187..851a283 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
@@ -80,6 +80,29 @@ case class DenseVector(val data: Array[Double]) extends Vector with Serializable
     data(index) = value
   }
 
+  /** Returns the dot product of the recipient and the argument
+    *
+    * @param other a Vector
+    * @return a scalar double of dot product
+    */
+  override def dot(other: Vector): Double = {
+    require(size == other.size, "The size of vector must be equal.")
+
+    other match {
+      case SparseVector(_, otherIndices, otherData) =>
+        otherIndices.zipWithIndex.map {
+          case (idx, sparseIdx) => data(idx) * otherData(sparseIdx)
+        }.sum
+      case _ => (0 until size).map(i => data(i) * other(i)).sum
+    }
+  }
+
+  /** Magnitude of a vector
+    *
+    * @return
+    */
+  override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
+
   def toSparseVector: SparseVector = {
     val nonZero = (0 until size).zip(data).filter(_._2 != 0)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
index 6cf4c63..0762efb 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
@@ -23,7 +23,7 @@ import scala.util.Sorting
 /** Sparse vector implementation storing the data in two arrays. One index contains the sorted
   * indices of the non-zero vector entries and the other the corresponding vector entries
   */
-class SparseVector(
+case class SparseVector(
     val size: Int,
     val indices: Array[Int],
     val data: Array[Double])
@@ -53,6 +53,42 @@ class SparseVector(
     new SparseVector(size, indices.clone, data.clone)
   }
 
+  /** Returns the dot product of the recipient and the argument
+    *
+    * @param other a Vector
+    * @return a scalar double of dot product
+    */
+  override def dot(other: Vector): Double = {
+    require(size == other.size, "The size of vector must be equal.")
+    other match {
+      case DenseVector(otherData) =>
+        indices.zipWithIndex.map { case (sparseIdx, idx) => data(idx) * otherData(sparseIdx) }.sum
+      case SparseVector(_, otherIndices, otherData) =>
+        var left = 0
+        var right = 0
+        var result = 0.0
+
+        while (left < indices.length && right < otherIndices.length) {
+          if (indices(left) < otherIndices(right)) {
+            left += 1
+          } else if (otherIndices(right) < indices(left)) {
+            right += 1
+          } else {
+            result += data(left) * otherData(right)
+            left += 1
+            right += 1
+          }
+        }
+        result
+    }
+  }
+
+  /** Magnitude of a vector
+    *
+    * @return
+    */
+  override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
+
   /** Element wise access function
     *
     * * @param index index of the accessed element

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
index ef6b7aa..83e0c65 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
@@ -49,6 +49,19 @@ trait Vector {
     */
   def copy: Vector
 
+  /** Returns the dot product of the recipient and the argument
+    *
+    * @param other a Vector
+    * @return a scalar double of dot product
+    */
+  def dot(other: Vector): Double
+
+  /** Magnitude of a vector
+    *
+    * @return
+    */
+  def magnitude: Double
+
   def equalsVector(vector: Vector): Boolean = {
     if(size == vector.size) {
       (0 until size) forall { idx =>

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
new file mode 100644
index 0000000..055ede3
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a Chebyshev distance metric. The class calculates the distance between
+  * the given vectors by finding the maximum difference between each coordinate.
+  *
+  * @see http://en.wikipedia.org/wiki/Chebyshev_distance
+  */
+class ChebyshevDistanceMetric extends DistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+    (0 until a.size).map(i => math.abs(a(i) - b(i))).max
+  }
+}
+
+object ChebyshevDistanceMetric {
+  def apply() = new ChebyshevDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
new file mode 100644
index 0000000..f32ea26
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a cosine distance metric. The class calculates the distance between
+  * the given vectors by dividing the dot product of two vectors by the product of their lengths.
+  * We convert the result of division to a usable distance. So, 1 - cos(angle) is actually returned.
+  *
+  * @see http://en.wikipedia.org/wiki/Cosine_similarity
+  */
+class CosineDistanceMetric extends DistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+
+    val dotProd: Double = a.dot(b)
+    val denominator: Double = a.magnitude * b.magnitude
+    if (dotProd == 0 && denominator == 0) {
+      0
+    } else {
+      1 - dotProd / denominator
+    }
+  }
+}
+
+object CosineDistanceMetric {
+  def apply() = new CosineDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
new file mode 100644
index 0000000..1297ffb
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** DistanceMeasure interface is used for object which determines distance between two points.
+  */
+trait DistanceMetric {
+  /** Returns the distance between the arguments.
+    *
+    * @param a a Vector defining a multi-dimensional point in some space
+    * @param b a Vector defining a multi-dimensional point in some space
+    * @return a scalar double of the distance
+    */
+  def distance(a: Vector, b: Vector): Double
+
+  protected def checkValidArguments(a: Vector, b: Vector) = {
+    require(a.size == b.size, "The each size of vectors must be same to calculate distance.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
new file mode 100644
index 0000000..153fb93
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a Euclidean distance metric. The metric calculates the distance between
+  * the given two vectors by summing the square root of the squared differences between
+  * each coordinate.
+  *
+  * http://en.wikipedia.org/wiki/Euclidean_distance
+  *
+  * If you don't care about the true distance and only need for comparison,
+  * [[SquaredEuclideanDistanceMetric]] will be faster because it doesn't calculate the actual
+  * square root of the distances.
+  *
+  * @see http://en.wikipedia.org/wiki/Euclidean_distance
+  */
+class EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = math.sqrt(super.distance(a, b))
+}
+
+object EuclideanDistanceMetric {
+  def apply() = new EuclideanDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
new file mode 100644
index 0000000..5983f79
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a Manhattan distance metric. The class calculates the distance between
+  * the given vectors by summing the differences between each coordinate.
+  *
+  * @see http://en.wikipedia.org/wiki/Taxicab_geometry
+  */
+class ManhattanDistanceMetric extends DistanceMetric{
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+    (0 until a.size).map(i => math.abs(a(i) - b(i))).sum
+  }
+}
+
+object ManhattanDistanceMetric {
+  def apply() = new ManhattanDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
new file mode 100644
index 0000000..50161d4
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a Minkowski distance metric. The metric is a generalization of
+  * L(p) distances: Euclidean distance and Manhattan distance. If you need for a special case of
+  * p = 1 or p = 2, use [[ManhattanDistanceMetric]], [[EuclideanDistanceMetric]]. This class is
+  * useful for high exponents.
+  *
+  * @param p the norm exponent of space
+  *
+  * @see http://en.wikipedia.org/wiki/Minkowski_distance
+  */
+class MinkowskiDistanceMetric(val p: Double) extends DistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+    math.pow((0 until a.size).map(i => math.pow(math.abs(a(i) - b(i)), p)).sum, 1 / p)
+  }
+}
+
+object MinkowskiDistanceMetric {
+  def apply(p: Double) = new MinkowskiDistanceMetric(p)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
new file mode 100644
index 0000000..fe546e9
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class is like [[EuclideanDistanceMetric]] but it does not take the square root.
+  *
+  * The value calculated by this class is not exact Euclidean distance, but it saves on computation
+  * when you need the value for only comparison.
+  */
+class SquaredEuclideanDistanceMetric extends DistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+    (0 until a.size).map(i => math.pow(a(i) - b(i), 2)).sum
+  }
+}
+
+object SquaredEuclideanDistanceMetric {
+  def apply() = new SquaredEuclideanDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
new file mode 100644
index 0000000..5141c98
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.Vector
+
+/** This class implements a Tanimoto distance metric. The class calculates the distance between
+  * the given vectors. The vectors are assumed as bit-wise vectors. We convert the result of
+  * division to a usable distance. So, 1 - similarity is actually returned.
+  *
+  * @see http://en.wikipedia.org/wiki/Jaccard_index
+  */
+class TanimotoDistanceMetric extends DistanceMetric {
+  override def distance(a: Vector, b: Vector): Double = {
+    checkValidArguments(a, b)
+
+    val dotProd: Double = a.dot(b)
+    1 - dotProd / (a.magnitude * a.magnitude + b.magnitude * b.magnitude - dotProd)
+  }
+}
+
+object TanimotoDistanceMetric {
+  def apply() = new TanimotoDistanceMetric()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
index 553f672..4146b8d 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
@@ -47,4 +47,40 @@ class DenseVectorSuite extends FlatSpec with Matchers {
       vector(size)
     }
   }
+  
+  it should "calculate dot product with DenseVector" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "calculate dot product with SparseVector" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = SparseVector.fromCOO(3, (0, 1), (1, 1))
+
+    vec1.dot(vec2) should be(1)
+  }
+
+  it should "calculate dot product with SparseVector 2" in {
+    val vec1 = DenseVector(Array(1, 0, 1, 0, 0))
+    val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1))
+
+    vec1.dot(vec2) should be(1)
+  }
+
+  it should "fail in case of calculation dot product with different size vector" in {
+    val vec1 = DenseVector(Array(1, 0))
+    val vec2 = DenseVector(Array(0))
+
+    intercept[IllegalArgumentException] {
+      vec1.dot(vec2)
+    }
+  }
+
+  it should "calculate magnitude of vector" in {
+    val vec = DenseVector(Array(1, 4, 8))
+
+    vec.magnitude should be(9)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
index cde141c..474fb23 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
@@ -97,4 +97,40 @@ class SparseVectorSuite extends FlatSpec with Matchers {
 
     sparseVector should not equal(copy)
   }
+
+  it should "calculate dot product with SparseVector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = SparseVector.fromCOO(4, (1, 1), (3, 1))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "calculate dot product with SparseVector 2" in {
+    val vec1 = SparseVector.fromCOO(5, (2, 3), (4, 1))
+    val vec2 = SparseVector.fromCOO(5, (4, 2), (2, 1))
+
+    vec1.dot(vec2) should be(5)
+  }
+
+  it should "calculate dot product with DenseVector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = DenseVector(Array(0, 1, 0, 1))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "fail in case of calculation dot product with different size vector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    intercept[IllegalArgumentException] {
+      vec1.dot(vec2)
+    }
+  }
+
+  it should "calculate magnitude of vector" in {
+    val vec = SparseVector.fromCOO(3, (0, 1), (1, 4), (2, 8))
+
+    vec.magnitude should be(9)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddb2b347/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
new file mode 100644
index 0000000..569a1a8
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+
+class DistanceMetricSuite extends FlatSpec with Matchers {
+  val EPSILON = 1e-8
+
+  behavior of "Distance Measures"
+
+  it should "calculate Euclidean distance correctly" in {
+    val vec1 = DenseVector(1, 9)
+    val vec2 = DenseVector(5, 6)
+
+    EuclideanDistanceMetric().distance(vec1, vec2) should be(5)
+  }
+
+  it should "calculate square value of Euclidean distance correctly" in {
+    val vec1 = DenseVector(1, 9)
+    val vec2 = DenseVector(5, 6)
+
+    SquaredEuclideanDistanceMetric().distance(vec1, vec2) should be(25)
+  }
+
+  it should "calculate Chebyshev distance correctly" in {
+    val vec1 = DenseVector(0, 3, 6)
+    val vec2 = DenseVector(0, 0, 0)
+
+    ChebyshevDistanceMetric().distance(vec1, vec2) should be(6)
+  }
+
+  it should "calculate Cosine distance correctly" in {
+    val vec1 = DenseVector(1, 0)
+    val vec2 = DenseVector(2, 2)
+
+    CosineDistanceMetric().distance(vec1, vec2) should be((1 - math.sqrt(2) / 2) +- EPSILON)
+  }
+
+  it should "calculate Manhattan distance correctly" in {
+    val vec1 = DenseVector(0, 0, 0, 1, 1, 1)
+    val vec2 = DenseVector(1, 1, 1, 0, 0, 0)
+
+    ManhattanDistanceMetric().distance(vec1, vec2) should be(6)
+  }
+
+  it should "calculate Minkowski distance correctly" in {
+    val vec1 = DenseVector(0, 0, 1, 1, 0)
+    val vec2 = DenseVector(1, 1, 0, 1, 2)
+
+    MinkowskiDistanceMetric(3).distance(vec1, vec2) should be(math.pow(11, 1.0 / 3) +- EPSILON)
+  }
+
+  it should "calculate Tanimoto distance correctly" in {
+    val vec1 = DenseVector(0, 1, 1)
+    val vec2 = DenseVector(1, 1, 0)
+
+    TanimotoDistanceMetric().distance(vec1, vec2) should be(1 - (1.0 / (2 + 2 - 1)) +- EPSILON)
+  }
+}