You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2017/02/16 00:26:08 UTC
spark git commit: [SPARK-18080][ML][PYTHON] Python API & Examples for
Locality Sensitive Hashing
Repository: spark
Updated Branches:
refs/heads/master 21b4ba2d6 -> 08c1972a0
[SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing
## What changes were proposed in this pull request?
This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH.
## How was this patch tested?
API and examples are tested using spark-submit:
`bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py`
`bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py`
User guide changes are generated and manually inspected:
`SKIP_API=1 jekyll build`
Author: Yun Ni <yu...@uber.com>
Author: Yanbo Liang <yb...@gmail.com>
Author: Yunni <Eu...@gmail.com>
Closes #16715 from Yunni/spark-18080.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08c1972a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08c1972a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08c1972a
Branch: refs/heads/master
Commit: 08c1972a0661d42f300520cc6e5fb31023de093b
Parents: 21b4ba2
Author: Yun Ni <yu...@uber.com>
Authored: Wed Feb 15 16:26:05 2017 -0800
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Wed Feb 15 16:26:05 2017 -0800
----------------------------------------------------------------------
docs/ml-features.md | 17 ++
.../JavaBucketedRandomProjectionLSHExample.java | 38 ++-
.../examples/ml/JavaMinHashLSHExample.java | 57 +++-
.../bucketed_random_projection_lsh_example.py | 81 ++++++
.../src/main/python/ml/min_hash_lsh_example.py | 81 ++++++
.../ml/BucketedRandomProjectionLSHExample.scala | 39 ++-
.../spark/examples/ml/MinHashLSHExample.scala | 43 ++-
.../scala/org/apache/spark/ml/feature/LSH.scala | 7 +-
python/pyspark/ml/feature.py | 291 +++++++++++++++++++
9 files changed, 601 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/docs/ml-features.md
----------------------------------------------------------------------
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 13d97a2..57605ba 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1558,6 +1558,15 @@ for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java %}
</div>
+
+<div data-lang="python" markdown="1">
+
+Refer to the [BucketedRandomProjectionLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.BucketedRandomProjectionLSH)
+for more details on the API.
+
+{% include_example python/ml/bucketed_random_projection_lsh_example.py %}
+</div>
+
</div>
### MinHash for Jaccard Distance
@@ -1590,4 +1599,12 @@ for more details on the API.
{% include_example java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java %}
</div>
+
+<div data-lang="python" markdown="1">
+
+Refer to the [MinHashLSH Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.MinHashLSH)
+for more details on the API.
+
+{% include_example python/ml/min_hash_lsh_example.py %}
+</div>
</div>
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
index ca3ee5a..4594e34 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
@@ -35,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.functions.col;
// $example off$
+/**
+ * An example demonstrating BucketedRandomProjectionLSH.
+ * Run with:
+ * bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample
+ */
public class JavaBucketedRandomProjectionLSHExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
@@ -61,7 +68,7 @@ public class JavaBucketedRandomProjectionLSHExample {
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
- new StructField("keys", new VectorUDT(), false, Metadata.empty())
+ new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
@@ -71,26 +78,31 @@ public class JavaBucketedRandomProjectionLSHExample {
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
- .setInputCol("keys")
- .setOutputCol("values");
+ .setInputCol("features")
+ .setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
+ System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
- // Cache the transformed columns
- Dataset<Row> transformedA = model.transform(dfA).cache();
- Dataset<Row> transformedB = model.transform(dfB).cache();
- // Approximate similarity join
- model.approxSimilarityJoin(dfA, dfB, 1.5).show();
- model.approxSimilarityJoin(transformedA, transformedB, 1.5).show();
- // Self Join
- model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show();
+ // Compute the locality sensitive hashes for the input rows, then perform approximate
+ // similarity join.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+ System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
+ model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("EuclideanDistance")).show();
- // Approximate nearest neighbor search
+ // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ // neighbor search.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxNearestNeighbors(transformedA, key, 2)`
+ System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
- model.approxNearestNeighbors(transformedA, key, 2).show();
// $example off$
spark.stop();
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
index 9dbbf6d..0aace46 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
+import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
@@ -34,8 +35,15 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.functions.col;
// $example off$
+/**
+ * An example demonstrating MinHashLSH.
+ * Run with:
+ * bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample
+ */
public class JavaMinHashLSHExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
@@ -44,25 +52,58 @@ public class JavaMinHashLSHExample {
.getOrCreate();
// $example on$
- List<Row> data = Arrays.asList(
+ List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
+ List<Row> dataB = Arrays.asList(
+ RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
+ RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
+ RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
+ );
+
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
- new StructField("keys", new VectorUDT(), false, Metadata.empty())
+ new StructField("features", new VectorUDT(), false, Metadata.empty())
});
- Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
+ Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
+ Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
+
+ int[] indices = {1, 3};
+ double[] values = {1.0, 1.0};
+ Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
- .setNumHashTables(1)
- .setInputCol("keys")
- .setOutputCol("values");
+ .setNumHashTables(5)
+ .setInputCol("features")
+ .setOutputCol("hashes");
+
+ MinHashLSHModel model = mh.fit(dfA);
+
+ // Feature Transformation
+ System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
+ model.transform(dfA).show();
+
+ // Compute the locality sensitive hashes for the input rows, then perform approximate
+ // similarity join.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+ System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
+ model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("JaccardDistance")).show();
- MinHashLSHModel model = mh.fit(dataFrame);
- model.transform(dataFrame).show();
+ // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ // neighbor search.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxNearestNeighbors(transformedA, key, 2)`
+ // It may return less than 2 rows when not enough approximate near-neighbor candidates are
+ // found.
+ System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
+ model.approxNearestNeighbors(dfA, key, 2).show();
// $example off$
spark.stop();
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
new file mode 100644
index 0000000..1b7a458
--- /dev/null
+++ b/examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.feature import BucketedRandomProjectionLSH
+from pyspark.ml.linalg import Vectors
+from pyspark.sql.functions import col
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating BucketedRandomProjectionLSH.
+Run with:
+ bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
+"""
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("BucketedRandomProjectionLSHExample") \
+ .getOrCreate()
+
+ # $example on$
+ dataA = [(0, Vectors.dense([1.0, 1.0]),),
+ (1, Vectors.dense([1.0, -1.0]),),
+ (2, Vectors.dense([-1.0, -1.0]),),
+ (3, Vectors.dense([-1.0, 1.0]),)]
+ dfA = spark.createDataFrame(dataA, ["id", "features"])
+
+ dataB = [(4, Vectors.dense([1.0, 0.0]),),
+ (5, Vectors.dense([-1.0, 0.0]),),
+ (6, Vectors.dense([0.0, 1.0]),),
+ (7, Vectors.dense([0.0, -1.0]),)]
+ dfB = spark.createDataFrame(dataB, ["id", "features"])
+
+ key = Vectors.dense([1.0, 0.0])
+
+ brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
+ numHashTables=3)
+ model = brp.fit(dfA)
+
+ # Feature Transformation
+ print("The hashed dataset where hashed values are stored in the column 'hashes':")
+ model.transform(dfA).show()
+
+ # Compute the locality sensitive hashes for the input rows, then perform approximate
+ # similarity join.
+ # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ # `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+ print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
+ model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("EuclideanDistance")).show()
+
+ # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ # neighbor search.
+ # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ # `model.approxNearestNeighbors(transformedA, key, 2)`
+ print("Approximately searching dfA for 2 nearest neighbors of the key:")
+ model.approxNearestNeighbors(dfA, key, 2).show()
+ # $example off$
+
+ spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/python/ml/min_hash_lsh_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/min_hash_lsh_example.py b/examples/src/main/python/ml/min_hash_lsh_example.py
new file mode 100644
index 0000000..7b1dd61
--- /dev/null
+++ b/examples/src/main/python/ml/min_hash_lsh_example.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.feature import MinHashLSH
+from pyspark.ml.linalg import Vectors
+from pyspark.sql.functions import col
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating MinHashLSH.
+Run with:
+ bin/spark-submit examples/src/main/python/ml/min_hash_lsh_example.py
+"""
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("MinHashLSHExample") \
+ .getOrCreate()
+
+ # $example on$
+ dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
+ (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
+ (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
+ dfA = spark.createDataFrame(dataA, ["id", "features"])
+
+ dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
+ (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
+ (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
+ dfB = spark.createDataFrame(dataB, ["id", "features"])
+
+ key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
+
+ mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
+ model = mh.fit(dfA)
+
+ # Feature Transformation
+ print("The hashed dataset where hashed values are stored in the column 'hashes':")
+ model.transform(dfA).show()
+
+ # Compute the locality sensitive hashes for the input rows, then perform approximate
+ # similarity join.
+ # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+ print("Approximately joining dfA and dfB on distance smaller than 0.6:")
+ model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("JaccardDistance")).show()
+
+ # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ # neighbor search.
+ # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ # `model.approxNearestNeighbors(transformedA, key, 2)`
+ # It may return less than 2 rows when not enough approximate near-neighbor candidates are
+ # found.
+ print("Approximately searching dfA for 2 nearest neighbors of the key:")
+ model.approxNearestNeighbors(dfA, key, 2).show()
+
+ # $example off$
+
+ spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
index 686cc39..654535c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
@@ -21,9 +21,15 @@ package org.apache.spark.examples.ml
// $example on$
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.sql.functions.col
// $example off$
import org.apache.spark.sql.SparkSession
+/**
+ * An example demonstrating BucketedRandomProjectionLSH.
+ * Run with:
+ * bin/run-example org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample
+ */
object BucketedRandomProjectionLSHExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession
@@ -38,40 +44,45 @@ object BucketedRandomProjectionLSHExample {
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
- )).toDF("id", "keys")
+ )).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
- )).toDF("id", "keys")
+ )).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
- .setInputCol("keys")
- .setOutputCol("values")
+ .setInputCol("features")
+ .setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
+ println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
- // Cache the transformed columns
- val transformedA = model.transform(dfA).cache()
- val transformedB = model.transform(dfB).cache()
- // Approximate similarity join
- model.approxSimilarityJoin(dfA, dfB, 1.5).show()
- model.approxSimilarityJoin(transformedA, transformedB, 1.5).show()
- // Self Join
- model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show()
+ // Compute the locality sensitive hashes for the input rows, then perform approximate
+ // similarity join.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
+ println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
+ model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("EuclideanDistance")).show()
- // Approximate nearest neighbor search
+ // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ // neighbor search.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxNearestNeighbors(transformedA, key, 2)`
+ println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
- model.approxNearestNeighbors(transformedA, key, 2).show()
// $example off$
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
index f4fc3cf..6c1e222 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
@@ -21,9 +21,15 @@ package org.apache.spark.examples.ml
// $example on$
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.sql.functions.col
// $example off$
import org.apache.spark.sql.SparkSession
+/**
+ * An example demonstrating MinHashLSH.
+ * Run with:
+ * bin/run-example org.apache.spark.examples.ml.MinHashLSHExample
+ */
object MinHashLSHExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession
@@ -37,38 +43,45 @@ object MinHashLSHExample {
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
- )).toDF("id", "keys")
+ )).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
- )).toDF("id", "keys")
+ )).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
- .setNumHashTables(3)
- .setInputCol("keys")
- .setOutputCol("values")
+ .setNumHashTables(5)
+ .setInputCol("features")
+ .setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
+ println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
- // Cache the transformed columns
- val transformedA = model.transform(dfA).cache()
- val transformedB = model.transform(dfB).cache()
- // Approximate similarity join
- model.approxSimilarityJoin(dfA, dfB, 0.6).show()
- model.approxSimilarityJoin(transformedA, transformedB, 0.6).show()
- // Self Join
- model.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show()
+ // Compute the locality sensitive hashes for the input rows, then perform approximate
+ // similarity join.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
+ println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
+ model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
+ .select(col("datasetA.id").alias("idA"),
+ col("datasetB.id").alias("idB"),
+ col("JaccardDistance")).show()
- // Approximate nearest neighbor search
+ // Compute the locality sensitive hashes for the input rows, then perform approximate nearest
+ // neighbor search.
+ // We could avoid computing hashes by passing in the already-transformed dataset, e.g.
+ // `model.approxNearestNeighbors(transformedA, key, 2)`
+ // It may return less than 2 rows when not enough approximate near-neighbor candidates are
+ // found.
+ println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
- model.approxNearestNeighbors(transformedA, key, 2).show()
// $example off$
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
index 309cc2e..1c9f47a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
@@ -222,7 +222,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
}
/**
- * Join two dataset to approximately find all pairs of rows whose distance are smaller than
+ * Join two datasets to approximately find all pairs of rows whose distance are smaller than
* the threshold. If the [[outputCol]] is missing, the method will transform the data; if the
* [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed
* data when necessary.
@@ -230,9 +230,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
* @param datasetA One of the datasets to join.
* @param datasetB Another dataset to join.
* @param threshold The threshold for the distance of row pairs.
- * @param distCol Output column for storing the distance between each result row and the key.
+ * @param distCol Output column for storing the distance between each pair of rows.
* @return A joined dataset containing pairs of rows. The original rows are in columns
- * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair.
+ * "datasetA" and "datasetB", and a column "distCol" is added to show the distance
+ * between each pair.
*/
def approxSimilarityJoin(
datasetA: Dataset[_],
http://git-wip-us.apache.org/repos/asf/spark/blob/08c1972a/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 1ab4291..c2eafbe 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -28,6 +28,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm
from pyspark.ml.common import inherit_doc
__all__ = ['Binarizer',
+ 'BucketedRandomProjectionLSH', 'BucketedRandomProjectionLSHModel',
'Bucketizer',
'ChiSqSelector', 'ChiSqSelectorModel',
'CountVectorizer', 'CountVectorizerModel',
@@ -37,6 +38,7 @@ __all__ = ['Binarizer',
'IDF', 'IDFModel',
'IndexToString',
'MaxAbsScaler', 'MaxAbsScalerModel',
+ 'MinHashLSH', 'MinHashLSHModel',
'MinMaxScaler', 'MinMaxScalerModel',
'NGram',
'Normalizer',
@@ -120,6 +122,196 @@ class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java
return self.getOrDefault(self.threshold)
+class LSHParams(Params):
+ """
+ Mixin for Locality Sensitive Hashing (LSH) algorithm parameters.
+ """
+
+ numHashTables = Param(Params._dummy(), "numHashTables", "number of hash tables, where " +
+ "increasing number of hash tables lowers the false negative rate, " +
+ "and decreasing it improves the running performance.",
+ typeConverter=TypeConverters.toInt)
+
+ def __init__(self):
+ super(LSHParams, self).__init__()
+
+ def setNumHashTables(self, value):
+ """
+ Sets the value of :py:attr:`numHashTables`.
+ """
+ return self._set(numHashTables=value)
+
+ def getNumHashTables(self):
+ """
+ Gets the value of numHashTables or its default value.
+ """
+ return self.getOrDefault(self.numHashTables)
+
+
+class LSHModel(JavaModel):
+ """
+ Mixin for Locality Sensitive Hashing (LSH) models.
+ """
+
+ def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, distCol="distCol"):
+ """
+ Given a large dataset and an item, approximately find at most k items which have the
+ closest distance to the item. If the :py:attr:`outputCol` is missing, the method will
+ transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows
+ caching of the transformed data when necessary.
+
+ .. note:: This method is experimental and will likely change behavior in the next release.
+
+ :param dataset: The dataset to search for nearest neighbors of the key.
+ :param key: Feature vector representing the item to search for.
+ :param numNearestNeighbors: The maximum number of nearest neighbors.
+ :param distCol: Output column for storing the distance between each result row and the key.
+ Use "distCol" as default value if it's not specified.
+ :return: A dataset containing at most k items closest to the key. A column "distCol" is
+ added to show the distance between each row and the key.
+ """
+ return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors,
+ distCol)
+
+ def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"):
+ """
+ Join two datasets to approximately find all pairs of rows whose distance are smaller than
+ the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data;
+ if the :py:attr:`outputCol` exists, it will use that. This allows caching of the
+ transformed data when necessary.
+
+ :param datasetA: One of the datasets to join.
+ :param datasetB: Another dataset to join.
+ :param threshold: The threshold for the distance of row pairs.
+ :param distCol: Output column for storing the distance between each pair of rows. Use
+ "distCol" as default value if it's not specified.
+ :return: A joined dataset containing pairs of rows. The original rows are in columns
+ "datasetA" and "datasetB", and a column "distCol" is added to show the distance
+ between each pair.
+ """
+ return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol)
+
+
+@inherit_doc
+class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+ JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ LSH class for Euclidean distance metrics.
+ The input is dense or sparse vectors, each of which represents a point in the Euclidean
+ distance space. The output will be vectors of configurable dimension. Hash values in the same
+ dimension are calculated by the same hash function.
+
+ .. seealso:: `Stable Distributions \
+ <https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Stable_distributions>`_
+ .. seealso:: `Hashing for Similarity Search: A Survey <https://arxiv.org/abs/1408.2927>`_
+
+ >>> from pyspark.ml.linalg import Vectors
+ >>> from pyspark.sql.functions import col
+ >>> data = [(0, Vectors.dense([-1.0, -1.0 ]),),
+ ... (1, Vectors.dense([-1.0, 1.0 ]),),
+ ... (2, Vectors.dense([1.0, -1.0 ]),),
+ ... (3, Vectors.dense([1.0, 1.0]),)]
+ >>> df = spark.createDataFrame(data, ["id", "features"])
+ >>> brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",
+ ... seed=12345, bucketLength=1.0)
+ >>> model = brp.fit(df)
+ >>> model.transform(df).head()
+ Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])])
+ >>> data2 = [(4, Vectors.dense([2.0, 2.0 ]),),
+ ... (5, Vectors.dense([2.0, 3.0 ]),),
+ ... (6, Vectors.dense([3.0, 2.0 ]),),
+ ... (7, Vectors.dense([3.0, 3.0]),)]
+ >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+ >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect()
+ [Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)]
+ >>> model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select(
+ ... col("datasetA.id").alias("idA"),
+ ... col("datasetB.id").alias("idB"),
+ ... col("EuclideanDistance")).show()
+ +---+---+-----------------+
+ |idA|idB|EuclideanDistance|
+ +---+---+-----------------+
+ | 3| 6| 2.23606797749979|
+ +---+---+-----------------+
+ ...
+ >>> brpPath = temp_path + "/brp"
+ >>> brp.save(brpPath)
+ >>> brp2 = BucketedRandomProjectionLSH.load(brpPath)
+ >>> brp2.getBucketLength() == brp.getBucketLength()
+ True
+ >>> modelPath = temp_path + "/brp-model"
+ >>> model.save(modelPath)
+ >>> model2 = BucketedRandomProjectionLSHModel.load(modelPath)
+ >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+ True
+
+ .. versionadded:: 2.2.0
+ """
+
+ bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " +
+ "a larger bucket lowers the false negative rate.",
+ typeConverter=TypeConverters.toFloat)
+
+ @keyword_only
+ def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None):
+ """
+ __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None)
+ """
+ super(BucketedRandomProjectionLSH, self).__init__()
+ self._java_obj = \
+ self._new_java_obj("org.apache.spark.ml.feature.BucketedRandomProjectionLSH", self.uid)
+ self._setDefault(numHashTables=1)
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None):
+ """
+ setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \
+ bucketLength=None)
+ Sets params for this BucketedRandomProjectionLSH.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.2.0")
+ def setBucketLength(self, value):
+ """
+ Sets the value of :py:attr:`bucketLength`.
+ """
+ return self._set(bucketLength=value)
+
+ @since("2.2.0")
+ def getBucketLength(self):
+ """
+ Gets the value of bucketLength or its default value.
+ """
+ return self.getOrDefault(self.bucketLength)
+
+ def _create_model(self, java_model):
+ return BucketedRandomProjectionLSHModel(java_model)
+
+
+class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Model fitted by :py:class:`BucketedRandomProjectionLSH`, where multiple random vectors are
+ stored. The vectors are normalized to be unit vectors and each vector is used in a hash
+ function: :math:`h_i(x) = floor(r_i \cdot x / bucketLength)` where :math:`r_i` is the
+ i-th random unit vector. The number of buckets will be `(max L2 norm of input vectors) /
+ bucketLength`.
+
+ .. versionadded:: 2.2.0
+ """
+
+
@inherit_doc
class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
@@ -755,6 +947,105 @@ class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
@inherit_doc
+class MinHashLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+ JavaMLReadable, JavaMLWritable):
+
+ """
+ .. note:: Experimental
+
+ LSH class for Jaccard distance.
+ The input can be dense or sparse vectors, but it is more efficient if it is sparse.
+ For example, `Vectors.sparse(10, [(2, 1.0), (3, 1.0), (5, 1.0)])` means there are 10 elements
+ in the space. This set contains elements 2, 3, and 5. Also, any input vector must have at
+ least 1 non-zero index, and all non-zero values are treated as binary "1" values.
+
+ .. seealso:: `Wikipedia on MinHash <https://en.wikipedia.org/wiki/MinHash>`_
+
+ >>> from pyspark.ml.linalg import Vectors
+ >>> from pyspark.sql.functions import col
+ >>> data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
+ ... (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
+ ... (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
+ >>> df = spark.createDataFrame(data, ["id", "features"])
+ >>> mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
+ >>> model = mh.fit(df)
+ >>> model.transform(df).head()
+ Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0}), hashes=[DenseVector([-1638925...
+ >>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
+ ... (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
+ ... (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
+ >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+ >>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0])
+ >>> model.approxNearestNeighbors(df2, key, 1).collect()
+ [Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}), hashes=[DenseVector([-163892...
+ >>> model.approxSimilarityJoin(df, df2, 0.6, distCol="JaccardDistance").select(
+ ... col("datasetA.id").alias("idA"),
+ ... col("datasetB.id").alias("idB"),
+ ... col("JaccardDistance")).show()
+ +---+---+---------------+
+ |idA|idB|JaccardDistance|
+ +---+---+---------------+
+ | 1| 4| 0.5|
+ | 0| 5| 0.5|
+ +---+---+---------------+
+ ...
+ >>> mhPath = temp_path + "/mh"
+ >>> mh.save(mhPath)
+ >>> mh2 = MinHashLSH.load(mhPath)
+ >>> mh2.getOutputCol() == mh.getOutputCol()
+ True
+ >>> modelPath = temp_path + "/mh-model"
+ >>> model.save(modelPath)
+ >>> model2 = MinHashLSHModel.load(modelPath)
+ >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+ True
+
+ .. versionadded:: 2.2.0
+ """
+
+ @keyword_only
+ def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+ """
+ __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+ """
+ super(MinHashLSH, self).__init__()
+ self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHashLSH", self.uid)
+ self._setDefault(numHashTables=1)
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+ """
+ setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+ Sets params for this MinHashLSH.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ def _create_model(self, java_model):
+ return MinHashLSHModel(java_model)
+
+
+class MinHashLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Model produced by :py:class:`MinHashLSH`, where where multiple hash functions are stored. Each
+ hash function is picked from the following family of hash functions, where :math:`a_i` and
+ :math:`b_i` are randomly chosen integers less than prime:
+ :math:`h_i(x) = ((x \cdot a_i + b_i) \mod prime)` This hash family is approximately min-wise
+ independent according to the reference.
+
+ .. seealso:: Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear \
+ permutations." Electronic Journal of Combinatorics 7 (2000): R26.
+
+ .. versionadded:: 2.2.0
+ """
+
+
+@inherit_doc
class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
Rescale each feature individually to a common range [min, max] linearly using column summary
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org