You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Stevens, Clay" <Cl...@wolterskluwer.com> on 2019/09/23 20:20:03 UTC

Efficient cosine similarity computation

There are several ways I can compute the cosine similarities between a Spark ML vector to each ML vector in a Spark DataFrame column then sorting for the highest results.  However, I can't come up with a method that is faster than replacing the `/data/` in a Spark ML Word2Vec model, then using `.findSynonyms()`.  The problem is the Word2Vec model is held entirely in the driver which can cause memory issues if the data set I want to compare to gets too big.

1. Is there a more efficient method than the ones I have shown below?
2. Could the data for the Word2Vec model be distributed across the cluster?
3. Could the the `.findSynonyms()` [Scala code](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L571toL619) be modified to make a spark sql function that can operate efficiently over a whole Spark DataFrame?


Methods I have tried:

#1 rdd function:
```

    # vecIn = vector of same dimensions as 'vectors' column
    def cosSim(row, vecIn):
        return (
            tuple(( Vectors.dense( Vectors.dense(row.vectors.dot(vecIn)) /
                        (Vectors.dense(np.sqrt(row.vectors.dot(row.vectors))) *
                          Vectors.dense(np.sqrt(vecIn.dot(vecIn)))))
                ).toArray().tolist()))

    df.rdd.map(lambda row: cosSim(row, vecIn)).toDF(['CosSim']).show(truncate=False)
```

#2  `.toIndexedRowMatrix().columnSimilarities()` then filter the results (not shown):

```

    spark.createDataFrame(
        IndexedRowMatrix(df.rdd.map(lambda row: (row.vectors.toArray())))
        .toBlockMatrix()
        .transpose()
        .toIndexedRowMatrix()
        .columnSimilarities()
        .entries)
```

#3 replace Word2Vec model `/data/` with my own, then load 'revised' model and use `.findSynonyms()`:
```
    df_words_vectors.schema
    ## StructType(List(StructField(word,StringType,true),StructField(vector,ArrayType(FloatType,true),true)))

    df_words_vectors.write.parquet("exiting_Word2Vec_model/data/", mode='overwrite')

    new_Word2Vec_model = Word2VecModel.load("exiting_Word2Vec_model")

    ## vecIn = vector of same dimensions as 'vector' column in DataFrame saved over Word2Vec model /data/
    new_Word2Vec_model.findSynonyms(vecIn, 20).show()
```


Clay Stevens

Re: Efficient cosine similarity computation

Posted by Chee Yee Lim <ch...@gmail.com>.
I've been trying to achieve the same objective, coming up with approaches
similar to your method 1 and 2. Method 2 is the slowest for me due to
massive amount of data being shuffled around at each matrix operation
stage. Method 3 is new to me, so I can't comment much.

I ended up using an approach that is similar to your method 1, which gives
reasonable performance in my use case.

*#4 Normalizer then UDF (PySpark code)*
```
normaliser = Normalizer(inputCol="vec", outputCol="norm_vec")
df_word_norm = normaliser.transform(df_word)

dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType())
df_score = df_word_norm.withColumn("score", dot_udf(df_word_norm.norm_vec1,
df_word_norm.norm_vec2))
# norm_vec1 and norm_vec2 come from a Cartesian join. Steps to produce them
are not shown for brevity.
```

Would be curious to learn how other people solve this problem.

Best wishes,
Chee Yee

On Tue, 24 Sep 2019 at 04:20, Stevens, Clay <Cl...@wolterskluwer.com>
wrote:

> There are several ways I can compute the cosine similarities between a
> Spark ML vector to each ML vector in a Spark DataFrame column then sorting
> for the highest results.  However, I can't come up with a method that is
> faster than replacing the `/data/` in a Spark ML Word2Vec model, then using
> `.findSynonyms()`.  The problem is the Word2Vec model is held entirely in
> the driver which can cause memory issues if the data set I want to compare
> to gets too big.
>
> *1.* Is there a more efficient method than the ones I have shown below?
> *2.* Could the data for the Word2Vec model be distributed across the
> cluster?
> *3.* Could the the `.findSynonyms()` [Scala code](
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L571toL619)
> be modified to make a spark sql function that can operate efficiently over
> a whole Spark DataFrame?
>
>
> *Methods I have tried:*
>
> *#1 rdd function:*
> ```
>
>     # vecIn = vector of same dimensions as 'vectors' column
>     def cosSim(row, vecIn):
>         return (
>             tuple(( Vectors.dense( Vectors.dense(row.vectors.dot(vecIn)) /
>
> (Vectors.dense(np.sqrt(row.vectors.dot(row.vectors))) *
>                           Vectors.dense(np.sqrt(vecIn.dot(vecIn)))))
>                 ).toArray().tolist()))
>
>     df.rdd.map(lambda row: cosSim(row,
> vecIn)).toDF(['CosSim']).show(truncate=False)
>
> ```
>
> *#2  `.toIndexedRowMatrix().columnSimilarities()` then filter the results
> (not shown):*
>
> ```
>
>     spark.createDataFrame(
>         IndexedRowMatrix(df.rdd.map(lambda row: (row.vectors.toArray())))
>         .toBlockMatrix()
>         .transpose()
>         .toIndexedRowMatrix()
>         .columnSimilarities()
>         .entries)
>
> ```
>
>
> *#3 replace Word2Vec model `/data/` with my own, then load 'revised' model
> and use `.findSynonyms()`:*
> ```
>
>     df_words_vectors.schema
>     ##
> StructType(List(StructField(word,StringType,true),StructField(vector,ArrayType(FloatType,true),true)))
>
>     df_words_vectors.write.parquet("exiting_Word2Vec_model/data/",
> mode='overwrite')
>
>     new_Word2Vec_model = Word2VecModel.load("exiting_Word2Vec_model")
>
>     ## vecIn = vector of same dimensions as 'vector' column in DataFrame
> saved over Word2Vec model /data/
>     new_Word2Vec_model.findSynonyms(vecIn, 20).show()
>
> ```
>
>
>
>
>
> Clay Stevens
>