You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2017/02/16 00:26:08 UTC

spark git commit: [SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing

Repository: spark
Updated Branches:
  refs/heads/master 21b4ba2d6 -> 08c1972a0


[SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing

## What changes were proposed in this pull request?
This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH.

## How was this patch tested?
API and examples are tested using spark-submit:
`bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py`
`bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py`

User guide changes are generated and manually inspected:
`SKIP_API=1 jekyll build`

Author: Yun Ni <yu...@uber.com>
Author: Yanbo Liang <yb...@gmail.com>
Author: Yunni <Eu...@gmail.com>

Closes #16715 from Yunni/spark-18080.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08c1972a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08c1972a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08c1972a

Branch: refs/heads/master
Commit: 08c1972a0661d42f300520cc6e5fb31023de093b
Parents: 21b4ba2
Author: Yun Ni <yu...@uber.com>
Authored: Wed Feb 15 16:26:05 2017 -0800
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Wed Feb 15 16:26:05 2017 -0800

----------------------------------------------------------------------
 docs/ml-features.md                             |  17 ++
 .../JavaBucketedRandomProjectionLSHExample.java |  38 ++-
 .../examples/ml/JavaMinHashLSHExample.java      |  57 +++-
 .../bucketed_random_projection_lsh_example.py   |  81 ++++++
 .../src/main/python/ml/min_hash_lsh_example.py  |  81 ++++++
 .../ml/BucketedRandomProjectionLSHExample.scala |  39 ++-
 .../spark/examples/ml/MinHashLSHExample.scala   |  43 ++-
 .../scala/org/apache/spark/ml/feature/LSH.scala |   7 +-
 python/pyspark/ml/feature.py                    | 291 +++++++++++++++++++
 9 files changed, 601 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/docs/ml-features.md
----------------------------------------------------------------------
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 13d97a2..57605ba 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1558,6 +1558,15 @@ for more details on the API.
 
 {% include_example java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java %}
 </div>
+
+<div data-lang="python" markdown="1">
+
+Refer to the [BucketedRandomProjectionLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.BucketedRandomProjectionLSH)
+for more details on the API.
+
+{% include_example python/ml/bucketed_random_projection_lsh_example.py %}
+</div>
+
 </div>
 
 ### MinHash for Jaccard Distance
@@ -1590,4 +1599,12 @@ for more details on the API.
 
 {% include_example java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java %}
 </div>
+
+<div data-lang="python" markdown="1">
+
+Refer to the [MinHashLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.MinHashLSH)
+for more details on the API.
+
+{% include_example python/ml/min_hash_lsh_example.py %}
+</div>
 </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
index ca3ee5a..4594e34 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
@@ -35,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.functions.col;
 // $example off$
 
+/**
+ * An example demonstrating BucketedRandomProjectionLSH.
+ * Run with:
+ *   bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample
+ */
 public class JavaBucketedRandomProjectionLSHExample {
   public static void main(String[] args) {
     SparkSession spark = SparkSession
@@ -61,7 +68,7 @@ public class JavaBucketedRandomProjectionLSHExample {
 
     StructType schema = new StructType(new StructField[]{
       new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
-      new StructField("keys", new VectorUDT(), false, Metadata.empty())
+      new StructField("features", new VectorUDT(), false, Metadata.empty())
     });
     Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
     Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
@@ -71,26 +78,31 @@ public class JavaBucketedRandomProjectionLSHExample {
     BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
       .setBucketLength(2.0)
       .setNumHashTables(3)
-      .setInputCol("keys")
-      .setOutputCol("values");
+      .setInputCol("features")
+      .setOutputCol("hashes");
 
     BucketedRandomProjectionLSHModel model = mh.fit(dfA);
 
     // Feature Transformation
+    System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
     model.transform(dfA).show();
-    // Cache the transformed columns
-    Dataset<Row> transformedA = model.transform(dfA).cache();
-    Dataset<Row> transformedB = model.transform(dfB).cache();
 
-    // Approximate similarity join
-    model.approxSimilarityJoin(dfA, dfB, 1.5).show();
-    model.approxSimilarityJoin(transformedA, transformedB, 1.5).show();
-    // Self Join
-    model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show();
+    // Compute the locality sensitive hashes for the input rows, then perform approximate
+    // similarity join.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+    System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
+    model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
+      .select(col("datasetA.id").alias("idA"),
+        col("datasetB.id").alias("idB"),
+        col("EuclideanDistance")).show();
 
-    // Approximate nearest neighbor search
+    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    // neighbor search.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxNearestNeighbors(transformedA, key, 2)`
+    System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
     model.approxNearestNeighbors(dfA, key, 2).show();
-    model.approxNearestNeighbors(transformedA, key, 2).show();
     // $example off$
 
     spark.stop();

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
index 9dbbf6d..0aace46 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.spark.ml.feature.MinHashLSH;
 import org.apache.spark.ml.feature.MinHashLSHModel;
+import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.ml.linalg.Vectors;
 import org.apache.spark.sql.Dataset;
@@ -34,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.functions.col;
 // $example off$
 
+/**
+ * An example demonstrating MinHashLSH.
+ * Run with:
+ *   bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample
+ */
 public class JavaMinHashLSHExample {
   public static void main(String[] args) {
     SparkSession spark = SparkSession
@@ -44,25 +52,58 @@ public class JavaMinHashLSHExample {
       .getOrCreate();
 
     // $example on$
-    List<Row> data = Arrays.asList(
+    List<Row> dataA = Arrays.asList(
       RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
       RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
       RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
     );
 
+    List<Row> dataB = Arrays.asList(
+      RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
+      RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
+      RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
+    );
+
     StructType schema = new StructType(new StructField[]{
       new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
-      new StructField("keys", new VectorUDT(), false, Metadata.empty())
+      new StructField("features", new VectorUDT(), false, Metadata.empty())
     });
-    Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
+    Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
+    Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
+
+    int[] indices = {1, 3};
+    double[] values = {1.0, 1.0};
+    Vector key = Vectors.sparse(6, indices, values);
 
     MinHashLSH mh = new MinHashLSH()
-      .setNumHashTables(1)
-      .setInputCol("keys")
-      .setOutputCol("values");
+      .setNumHashTables(5)
+      .setInputCol("features")
+      .setOutputCol("hashes");
+
+    MinHashLSHModel model = mh.fit(dfA);
+
+    // Feature Transformation
+    System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
+    model.transform(dfA).show();
+
+    // Compute the locality sensitive hashes for the input rows, then perform approximate
+    // similarity join.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+    System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
+    model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
+      .select(col("datasetA.id").alias("idA"),
+        col("datasetB.id").alias("idB"),
+        col("JaccardDistance")).show();
 
-    MinHashLSHModel model = mh.fit(dataFrame);
-    model.transform(dataFrame).show();
+    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    // neighbor search.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxNearestNeighbors(transformedA, key, 2)`
+    // It may return less than 2 rows when not enough approximate near-neighbor candidates are
+    // found.
+    System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
+    model.approxNearestNeighbors(dfA, key, 2).show();
     // $example off$
 
     spark.stop();

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
new file mode 100644
index 0000000..1b7a458
--- /dev/null
+++ b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.feature import BucketedRandomProjectionLSH
+from pyspark.ml.linalg import Vectors
+from pyspark.sql.functions import col
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating BucketedRandomProjectionLSH.
+Run with:
+  bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
+"""
+
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("BucketedRandomProjectionLSHExample") \
+        .getOrCreate()
+
+    # $example on$
+    dataA = [(0, Vectors.dense([1.0, 1.0]),),
+             (1, Vectors.dense([1.0, -1.0]),),
+             (2, Vectors.dense([-1.0, -1.0]),),
+             (3, Vectors.dense([-1.0, 1.0]),)]
+    dfA = spark.createDataFrame(dataA, ["id", "features"])
+
+    dataB = [(4, Vectors.dense([1.0, 0.0]),),
+             (5, Vectors.dense([-1.0, 0.0]),),
+             (6, Vectors.dense([0.0, 1.0]),),
+             (7, Vectors.dense([0.0, -1.0]),)]
+    dfB = spark.createDataFrame(dataB, ["id", "features"])
+
+    key = Vectors.dense([1.0, 0.0])
+
+    brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
+                                      numHashTables=3)
+    model = brp.fit(dfA)
+
+    # Feature Transformation
+    print("The hashed dataset where hashed values are stored in the column 'hashes':")
+    model.transform(dfA).show()
+
+    # Compute the locality sensitive hashes for the input rows, then perform approximate
+    # similarity join.
+    # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    # `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+    print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
+    model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
+        .select(col("datasetA.id").alias("idA"),
+                col("datasetB.id").alias("idB"),
+                col("EuclideanDistance")).show()
+
+    # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    # neighbor search.
+    # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    # `model.approxNearestNeighbors(transformedA, key, 2)`
+    print("Approximately searching dfA for 2 nearest neighbors of the key:")
+    model.approxNearestNeighbors(dfA, key, 2).show()
+    # $example off$
+
+    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/python/ml/min_hash_lsh_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/min_hash_lsh_example.py b/examples/src/main/python/ml/min_hash_lsh_example.py
new file mode 100644
index 0000000..7b1dd61
--- /dev/null
+++ b/examples/src/main/python/ml/min_hash_lsh_example.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.feature import MinHashLSH
+from pyspark.ml.linalg import Vectors
+from pyspark.sql.functions import col
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating MinHashLSH.
+Run with:
+  bin/spark-submit examples/src/main/python/ml/min_hash_lsh_example.py
+"""
+
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("MinHashLSHExample") \
+        .getOrCreate()
+
+    # $example on$
+    dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
+             (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
+             (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
+    dfA = spark.createDataFrame(dataA, ["id", "features"])
+
+    dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
+             (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
+             (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
+    dfB = spark.createDataFrame(dataB, ["id", "features"])
+
+    key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
+
+    mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
+    model = mh.fit(dfA)
+
+    # Feature Transformation
+    print("The hashed dataset where hashed values are stored in the column 'hashes':")
+    model.transform(dfA).show()
+
+    # Compute the locality sensitive hashes for the input rows, then perform approximate
+    # similarity join.
+    # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+    print("Approximately joining dfA and dfB on distance smaller than 0.6:")
+    model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
+        .select(col("datasetA.id").alias("idA"),
+                col("datasetB.id").alias("idB"),
+                col("JaccardDistance")).show()
+
+    # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    # neighbor search.
+    # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    # `model.approxNearestNeighbors(transformedA, key, 2)`
+    # It may return less than 2 rows when not enough approximate near-neighbor candidates are
+    # found.
+    print("Approximately searching dfA for 2 nearest neighbors of the key:")
+    model.approxNearestNeighbors(dfA, key, 2).show()
+
+    # $example off$
+
+    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
index 686cc39..654535c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
@@ -21,9 +21,15 @@ package org.apache.spark.examples.ml
 // $example on$
 import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
 import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.sql.functions.col
 // $example off$
 import org.apache.spark.sql.SparkSession
 
+/**
+ * An example demonstrating BucketedRandomProjectionLSH.
+ * Run with:
+ *   bin/run-example org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample
+ */
 object BucketedRandomProjectionLSHExample {
   def main(args: Array[String]): Unit = {
     // Creates a SparkSession
@@ -38,40 +44,45 @@ object BucketedRandomProjectionLSHExample {
       (1, Vectors.dense(1.0, -1.0)),
       (2, Vectors.dense(-1.0, -1.0)),
       (3, Vectors.dense(-1.0, 1.0))
-    )).toDF("id", "keys")
+    )).toDF("id", "features")
 
     val dfB = spark.createDataFrame(Seq(
       (4, Vectors.dense(1.0, 0.0)),
       (5, Vectors.dense(-1.0, 0.0)),
       (6, Vectors.dense(0.0, 1.0)),
       (7, Vectors.dense(0.0, -1.0))
-    )).toDF("id", "keys")
+    )).toDF("id", "features")
 
     val key = Vectors.dense(1.0, 0.0)
 
     val brp = new BucketedRandomProjectionLSH()
       .setBucketLength(2.0)
       .setNumHashTables(3)
-      .setInputCol("keys")
-      .setOutputCol("values")
+      .setInputCol("features")
+      .setOutputCol("hashes")
 
     val model = brp.fit(dfA)
 
     // Feature Transformation
+    println("The hashed dataset where hashed values are stored in the column 'hashes':")
     model.transform(dfA).show()
-    // Cache the transformed columns
-    val transformedA = model.transform(dfA).cache()
-    val transformedB = model.transform(dfB).cache()
 
-    // Approximate similarity join
-    model.approxSimilarityJoin(dfA, dfB, 1.5).show()
-    model.approxSimilarityJoin(transformedA, transformedB, 1.5).show()
-    // Self Join
-    model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show()
+    // Compute the locality sensitive hashes for the input rows, then perform approximate
+    // similarity join.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+    println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
+    model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
+      .select(col("datasetA.id").alias("idA"),
+        col("datasetB.id").alias("idB"),
+        col("EuclideanDistance")).show()
 
-    // Approximate nearest neighbor search
+    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    // neighbor search.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxNearestNeighbors(transformedA, key, 2)`
+    println("Approximately searching dfA for 2 nearest neighbors of the key:")
     model.approxNearestNeighbors(dfA, key, 2).show()
-    model.approxNearestNeighbors(transformedA, key, 2).show()
     // $example off$
 
     spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
index f4fc3cf..6c1e222 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
@@ -21,9 +21,15 @@ package org.apache.spark.examples.ml
 // $example on$
 import org.apache.spark.ml.feature.MinHashLSH
 import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.sql.functions.col
 // $example off$
 import org.apache.spark.sql.SparkSession
 
+/**
+ * An example demonstrating MinHashLSH.
+ * Run with:
+ *   bin/run-example org.apache.spark.examples.ml.MinHashLSHExample
+ */
 object MinHashLSHExample {
   def main(args: Array[String]): Unit = {
     // Creates a SparkSession
@@ -37,38 +43,45 @@ object MinHashLSHExample {
       (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
       (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
       (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
-    )).toDF("id", "keys")
+    )).toDF("id", "features")
 
     val dfB = spark.createDataFrame(Seq(
       (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
       (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
       (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
-    )).toDF("id", "keys")
+    )).toDF("id", "features")
 
     val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
 
     val mh = new MinHashLSH()
-      .setNumHashTables(3)
-      .setInputCol("keys")
-      .setOutputCol("values")
+      .setNumHashTables(5)
+      .setInputCol("features")
+      .setOutputCol("hashes")
 
     val model = mh.fit(dfA)
 
     // Feature Transformation
+    println("The hashed dataset where hashed values are stored in the column 'hashes':")
     model.transform(dfA).show()
-    // Cache the transformed columns
-    val transformedA = model.transform(dfA).cache()
-    val transformedB = model.transform(dfB).cache()
 
-    // Approximate similarity join
-    model.approxSimilarityJoin(dfA, dfB, 0.6).show()
-    model.approxSimilarityJoin(transformedA, transformedB, 0.6).show()
-    // Self Join
-    model.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show()
+    // Compute the locality sensitive hashes for the input rows, then perform approximate
+    // similarity join.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+    println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
+    model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
+      .select(col("datasetA.id").alias("idA"),
+        col("datasetB.id").alias("idB"),
+        col("JaccardDistance")).show()
 
-    // Approximate nearest neighbor search
+    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+    // neighbor search.
+    // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+    // `model.approxNearestNeighbors(transformedA, key, 2)`
+    // It may return less than 2 rows when not enough approximate near-neighbor candidates are
+    // found.
+    println("Approximately searching dfA for 2 nearest neighbors of the key:")
     model.approxNearestNeighbors(dfA, key, 2).show()
-    model.approxNearestNeighbors(transformedA, key, 2).show()
     // $example off$
 
     spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
index 309cc2e..1c9f47a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
@@ -222,7 +222,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
   }
 
   /**
-   * Join two dataset to approximately find all pairs of rows whose distance are smaller than
+   * Join two datasets to approximately find all pairs of rows whose distance are smaller than
    * the threshold. If the [[outputCol]] is missing, the method will transform the data; if the
    * [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed
    * data when necessary.
@@ -230,9 +230,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
    * @param datasetA One of the datasets to join.
    * @param datasetB Another dataset to join.
    * @param threshold The threshold for the distance of row pairs.
-   * @param distCol Output column for storing the distance between each result row and the key.
+   * @param distCol Output column for storing the distance between each pair of rows.
    * @return A joined dataset containing pairs of rows. The original rows are in columns
-   *         "datasetA" and "datasetB", and a distCol is added to show the distance of each pair.
+   *         "datasetA" and "datasetB", and a column "distCol" is added to show the distance
+   *         between each pair.
    */
   def approxSimilarityJoin(
       datasetA: Dataset[_],

http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 1ab4291..c2eafbe 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -28,6 +28,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm
 from pyspark.ml.common import inherit_doc
 
 __all__ = ['Binarizer',
+           'BucketedRandomProjectionLSH', 'BucketedRandomProjectionLSHModel',
            'Bucketizer',
            'ChiSqSelector', 'ChiSqSelectorModel',
            'CountVectorizer', 'CountVectorizerModel',
@@ -37,6 +38,7 @@ __all__ = ['Binarizer',
            'IDF', 'IDFModel',
            'IndexToString',
            'MaxAbsScaler', 'MaxAbsScalerModel',
+           'MinHashLSH', 'MinHashLSHModel',
            'MinMaxScaler', 'MinMaxScalerModel',
            'NGram',
            'Normalizer',
@@ -120,6 +122,196 @@ class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java
         return self.getOrDefault(self.threshold)
 
 
+class LSHParams(Params):
+    """
+    Mixin for Locality Sensitive Hashing (LSH) algorithm parameters.
+    """
+
+    numHashTables = Param(Params._dummy(), "numHashTables", "number of hash tables, where " +
+                          "increasing number of hash tables lowers the false negative rate, " +
+                          "and decreasing it improves the running performance.",
+                          typeConverter=TypeConverters.toInt)
+
+    def __init__(self):
+        super(LSHParams, self).__init__()
+
+    def setNumHashTables(self, value):
+        """
+        Sets the value of :py:attr:`numHashTables`.
+        """
+        return self._set(numHashTables=value)
+
+    def getNumHashTables(self):
+        """
+        Gets the value of numHashTables or its default value.
+        """
+        return self.getOrDefault(self.numHashTables)
+
+
+class LSHModel(JavaModel):
+    """
+    Mixin for Locality Sensitive Hashing (LSH) models.
+    """
+
+    def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, distCol="distCol"):
+        """
+        Given a large dataset and an item, approximately find at most k items which have the
+        closest distance to the item. If the :py:attr:`outputCol` is missing, the method will
+        transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows
+        caching of the transformed data when necessary.
+
+        .. note:: This method is experimental and will likely change behavior in the next release.
+
+        :param dataset: The dataset to search for nearest neighbors of the key.
+        :param key: Feature vector representing the item to search for.
+        :param numNearestNeighbors: The maximum number of nearest neighbors.
+        :param distCol: Output column for storing the distance between each result row and the key.
+                        Use "distCol" as default value if it's not specified.
+        :return: A dataset containing at most k items closest to the key. A column "distCol" is
+                 added to show the distance between each row and the key.
+        """
+        return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors,
+                               distCol)
+
+    def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"):
+        """
+        Join two datasets to approximately find all pairs of rows whose distance are smaller than
+        the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data;
+        if the :py:attr:`outputCol` exists, it will use that. This allows caching of the
+        transformed data when necessary.
+
+        :param datasetA: One of the datasets to join.
+        :param datasetB: Another dataset to join.
+        :param threshold: The threshold for the distance of row pairs.
+        :param distCol: Output column for storing the distance between each pair of rows. Use
+                        "distCol" as default value if it's not specified.
+        :return: A joined dataset containing pairs of rows. The original rows are in columns
+                 "datasetA" and "datasetB", and a column "distCol" is added to show the distance
+                 between each pair.
+        """
+        return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol)
+
+
+@inherit_doc
+class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+                                  JavaMLReadable, JavaMLWritable):
+    """
+    .. note:: Experimental
+
+    LSH class for Euclidean distance metrics.
+    The input is dense or sparse vectors, each of which represents a point in the Euclidean
+    distance space. The output will be vectors of configurable dimension. Hash values in the same
+    dimension are calculated by the same hash function.
+
+    .. seealso:: `Stable Distributions \
+    <https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Stable_distributions>`_
+    .. seealso:: `Hashing for Similarity Search: A Survey <https://arxiv.org/abs/1408.2927>`_
+
+    >>> from pyspark.ml.linalg import Vectors
+    >>> from pyspark.sql.functions import col
+    >>> data = [(0, Vectors.dense([-1.0, -1.0 ]),),
+    ...         (1, Vectors.dense([-1.0, 1.0 ]),),
+    ...         (2, Vectors.dense([1.0, -1.0 ]),),
+    ...         (3, Vectors.dense([1.0, 1.0]),)]
+    >>> df = spark.createDataFrame(data, ["id", "features"])
+    >>> brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",
+    ...                                   seed=12345, bucketLength=1.0)
+    >>> model = brp.fit(df)
+    >>> model.transform(df).head()
+    Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])])
+    >>> data2 = [(4, Vectors.dense([2.0, 2.0 ]),),
+    ...          (5, Vectors.dense([2.0, 3.0 ]),),
+    ...          (6, Vectors.dense([3.0, 2.0 ]),),
+    ...          (7, Vectors.dense([3.0, 3.0]),)]
+    >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+    >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect()
+    [Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)]
+    >>> model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select(
+    ...     col("datasetA.id").alias("idA"),
+    ...     col("datasetB.id").alias("idB"),
+    ...     col("EuclideanDistance")).show()
+    +---+---+-----------------+
+    |idA|idB|EuclideanDistance|
+    +---+---+-----------------+
+    |  3|  6| 2.23606797749979|
+    +---+---+-----------------+
+    ...
+    >>> brpPath = temp_path + "/brp"
+    >>> brp.save(brpPath)
+    >>> brp2 = BucketedRandomProjectionLSH.load(brpPath)
+    >>> brp2.getBucketLength() == brp.getBucketLength()
+    True
+    >>> modelPath = temp_path + "/brp-model"
+    >>> model.save(modelPath)
+    >>> model2 = BucketedRandomProjectionLSHModel.load(modelPath)
+    >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+    True
+
+    .. versionadded:: 2.2.0
+    """
+
+    bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " +
+                         "a larger bucket lowers the false negative rate.",
+                         typeConverter=TypeConverters.toFloat)
+
+    @keyword_only
+    def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+                 bucketLength=None):
+        """
+        __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+                 bucketLength=None)
+        """
+        super(BucketedRandomProjectionLSH, self).__init__()
+        self._java_obj = \
+            self._new_java_obj("org.apache.spark.ml.feature.BucketedRandomProjectionLSH", self.uid)
+        self._setDefault(numHashTables=1)
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    @since("2.2.0")
+    def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+                  bucketLength=None):
+        """
+        setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \
+                  bucketLength=None)
+        Sets params for this BucketedRandomProjectionLSH.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set(**kwargs)
+
+    @since("2.2.0")
+    def setBucketLength(self, value):
+        """
+        Sets the value of :py:attr:`bucketLength`.
+        """
+        return self._set(bucketLength=value)
+
+    @since("2.2.0")
+    def getBucketLength(self):
+        """
+        Gets the value of bucketLength or its default value.
+        """
+        return self.getOrDefault(self.bucketLength)
+
+    def _create_model(self, java_model):
+        return BucketedRandomProjectionLSHModel(java_model)
+
+
+class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+    """
+    .. note:: Experimental
+
+    Model fitted by :py:class:`BucketedRandomProjectionLSH`, where multiple random vectors are
+    stored. The vectors are normalized to be unit vectors and each vector is used in a hash
+    function: :math:`h_i(x) = floor(r_i \cdot x / bucketLength)` where :math:`r_i` is the
+    i-th random unit vector. The number of buckets will be `(max L2 norm of input vectors) /
+    bucketLength`.
+
+    .. versionadded:: 2.2.0
+    """
+
+
 @inherit_doc
 class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
     """
@@ -755,6 +947,105 @@ class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
 
 
 @inherit_doc
+class MinHashLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+                 JavaMLReadable, JavaMLWritable):
+
+    """
+    .. note:: Experimental
+
+    LSH class for Jaccard distance.
+    The input can be dense or sparse vectors, but it is more efficient if it is sparse.
+    For example, `Vectors.sparse(10, [(2, 1.0), (3, 1.0), (5, 1.0)])` means there are 10 elements
+    in the space. This set contains elements 2, 3, and 5. Also, any input vector must have at
+    least 1 non-zero index, and all non-zero values are treated as binary "1" values.
+
+    .. seealso:: `Wikipedia on MinHash <https://en.wikipedia.org/wiki/MinHash>`_
+
+    >>> from pyspark.ml.linalg import Vectors
+    >>> from pyspark.sql.functions import col
+    >>> data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
+    ...         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
+    ...         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
+    >>> df = spark.createDataFrame(data, ["id", "features"])
+    >>> mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
+    >>> model = mh.fit(df)
+    >>> model.transform(df).head()
+    Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0}), hashes=[DenseVector([-1638925...
+    >>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
+    ...          (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
+    ...          (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
+    >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+    >>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0])
+    >>> model.approxNearestNeighbors(df2, key, 1).collect()
+    [Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}), hashes=[DenseVector([-163892...
+    >>> model.approxSimilarityJoin(df, df2, 0.6, distCol="JaccardDistance").select(
+    ...     col("datasetA.id").alias("idA"),
+    ...     col("datasetB.id").alias("idB"),
+    ...     col("JaccardDistance")).show()
+    +---+---+---------------+
+    |idA|idB|JaccardDistance|
+    +---+---+---------------+
+    |  1|  4|            0.5|
+    |  0|  5|            0.5|
+    +---+---+---------------+
+    ...
+    >>> mhPath = temp_path + "/mh"
+    >>> mh.save(mhPath)
+    >>> mh2 = MinHashLSH.load(mhPath)
+    >>> mh2.getOutputCol() == mh.getOutputCol()
+    True
+    >>> modelPath = temp_path + "/mh-model"
+    >>> model.save(modelPath)
+    >>> model2 = MinHashLSHModel.load(modelPath)
+    >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+    True
+
+    .. versionadded:: 2.2.0
+    """
+
+    @keyword_only
+    def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+        """
+        __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+        """
+        super(MinHashLSH, self).__init__()
+        self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHashLSH", self.uid)
+        self._setDefault(numHashTables=1)
+        kwargs = self.__init__._input_kwargs
+        self.setParams(**kwargs)
+
+    @keyword_only
+    @since("2.2.0")
+    def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+        """
+        setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+        Sets params for this MinHashLSH.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set(**kwargs)
+
+    def _create_model(self, java_model):
+        return MinHashLSHModel(java_model)
+
+
+class MinHashLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+    """
+    .. note:: Experimental
+
+    Model produced by :py:class:`MinHashLSH`, where where multiple hash functions are stored. Each
+    hash function is picked from the following family of hash functions, where :math:`a_i` and
+    :math:`b_i` are randomly chosen integers less than prime:
+    :math:`h_i(x) = ((x \cdot a_i + b_i) \mod prime)` This hash family is approximately min-wise
+    independent according to the reference.
+
+    .. seealso:: Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear \
+    permutations." Electronic Journal of Combinatorics 7 (2000): R26.
+
+    .. versionadded:: 2.2.0
+    """
+
+
+@inherit_doc
 class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
     """
     Rescale each feature individually to a common range [min, max] linearly using column summary


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org