You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Meeraj Kunnumpurath <me...@servicesymphony.com> on 2016/11/13 15:04:07 UTC

Nearest neighbour search

Hello,

I have a dataset containing TF-IDF vectors for a corpus of documents. How
do I perform a nearest neighbour search on the dataset, using cosine
similarity?

  val df = spark.read.option("header", "false").csv("data")

  val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")

  val tf = new HashingTF().setInputCol("words").setOutputCol("tf")

  val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")

  val df1 = tf.transform(tk.transform(df))

  idf.fit(df1).transform(df1).select("tf-idf").show(10)
Thank you

-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*

Re: Nearest neighbour search

Posted by Nick Pentreath <ni...@gmail.com>.
LSH-based NN search and similarity join should be out in Spark 2.1 -
there's a little work being done still to clear up the APIs and some
functionality.

Check out https://issues.apache.org/jira/browse/SPARK-5992

On Mon, 14 Nov 2016 at 16:12, Kevin Mellott <ke...@gmail.com>
wrote:

> You may be able to benefit from Soundcloud's open source implementation,
> either as a solution or as a reference implementation.
>
> https://github.com/soundcloud/cosine-lsh-join-spark
>
> Thanks,
> Kevin
>
> On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
> meeraj@servicesymphony.com> wrote:
>
> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>     val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vector]("tf-idf")
>     def dorProduct(vectorA: Vector) = {
>       var dp = 0.0
>       var index = vectorA.size - 1
>       for (i <- 0 to index) {
>         dp += vectorA(i) * tfIdfSrc(i)
>       }
>       dp
>     }
>     val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
>     ds.filter(s"_c0 != '$uri'").withColumn("dp", dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at
> com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36)
> at
> com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at
> com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> meeraj@servicesymphony.com> wrote:
>
> This is what I have done, is there a better way of doing this?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>
>   val df1 = tf.transform(tk.transform(df))
>
>   val idfs = idf.fit(df1).transform(df1)
>
>
>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama",
> idfs))
>
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>
>     var res : Row = null
>
>     var metric : Double = 0
>
>     val tfIdfSrc = ds.filter(s"_c0 ==
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
>
>     ds.filter("_c0 != '" + uri + "'").foreach { r =>
>
>       val tfIdfDst = r.getAs[Vector]("tf-idf")
>
>       val dp = dorProduct(tfIdfSrc, tfIdfDst)
>
>       if (dp > metric) {
>
>         res = r
>
>         metric = dp
>
>       }
>
>     }
>
>     return res.getAs[String]("_c1")
>
>   }
>
>
>   def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>
>     var dotProduct = 0.0
>
>     var normA = 0.0
>
>     var normB = 0.0
>
>     var index = vectorA.size - 1
>
>     for (i <- 0 to index) {
>
>       dotProduct += vectorA(i) * vectorB(i)
>
>       normA += Math.pow(vectorA(i), 2)
>
>       normB += Math.pow(vectorB(i), 2)
>
>     }
>
>     (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)))
>
>   }
>
>
>   def dorProduct(vectorA: Vector, vectorB: Vector) = {
>
>     var dp = 0.0
>
>     var index = vectorA.size - 1
>
>     for (i <- 0 to index) {
>
>       dp += vectorA(i) * vectorB(i)
>
>     }
>
>     dp
>
>   }
>
> On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath <
> meeraj@servicesymphony.com> wrote:
>
> Hello,
>
> I have a dataset containing TF-IDF vectors for a corpus of documents. How
> do I perform a nearest neighbour search on the dataset, using cosine
> similarity?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>
>   idf.fit(df1).transform(df1).select("tf-idf").show(10)
> Thank you
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>
>
>

Re: Nearest neighbour search

Posted by Kevin Mellott <ke...@gmail.com>.
You may be able to benefit from Soundcloud's open source implementation,
either as a solution or as a reference implementation.

https://github.com/soundcloud/cosine-lsh-join-spark

Thanks,
Kevin

On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
meeraj@servicesymphony.com> wrote:

> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>     val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vector]("tf-idf")
>     def dorProduct(vectorA: Vector) = {
>       var dp = 0.0
>       var index = vectorA.size - 1
>       for (i <- 0 to index) {
>         dp += vectorA(i) * tfIdfSrc(i)
>       }
>       dp
>     }
>     val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
>     ds.filter(s"_c0 != '$uri'").withColumn("dp", dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(
> literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(
> ClusteringBasics.scala:36)
> at com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$
> clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at com.ss.ml.clustering.ClusteringBasics$delayedInit$
> body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> meeraj@servicesymphony.com> wrote:
>
>> This is what I have done, is there a better way of doing this?
>>
>>   val df = spark.read.option("header", "false").csv("data")
>>
>>
>>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>
>>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>
>>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>
>>
>>   val df1 = tf.transform(tk.transform(df))
>>
>>   val idfs = idf.fit(df1).transform(df1)
>>
>>
>>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama",
>> idfs))
>>
>>
>>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>>
>>     var res : Row = null
>>
>>     var metric : Double = 0
>>
>>     val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vect
>> or]("tf-idf")
>>
>>     ds.filter("_c0 != '" + uri + "'").foreach { r =>
>>
>>       val tfIdfDst = r.getAs[Vector]("tf-idf")
>>
>>       val dp = dorProduct(tfIdfSrc, tfIdfDst)
>>
>>       if (dp > metric) {
>>
>>         res = r
>>
>>         metric = dp
>>
>>       }
>>
>>     }
>>
>>     return res.getAs[String]("_c1")
>>
>>   }
>>
>>
>>   def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>>
>>     var dotProduct = 0.0
>>
>>     var normA = 0.0
>>
>>     var normB = 0.0
>>
>>     var index = vectorA.size - 1
>>
>>     for (i <- 0 to index) {
>>
>>       dotProduct += vectorA(i) * vectorB(i)
>>
>>       normA += Math.pow(vectorA(i), 2)
>>
>>       normB += Math.pow(vectorB(i), 2)
>>
>>     }
>>
>>     (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)))
>>
>>   }
>>
>>
>>   def dorProduct(vectorA: Vector, vectorB: Vector) = {
>>
>>     var dp = 0.0
>>
>>     var index = vectorA.size - 1
>>
>>     for (i <- 0 to index) {
>>
>>       dp += vectorA(i) * vectorB(i)
>>
>>     }
>>
>>     dp
>>
>>   }
>>
>> On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath <
>> meeraj@servicesymphony.com> wrote:
>>
>>> Hello,
>>>
>>> I have a dataset containing TF-IDF vectors for a corpus of documents.
>>> How do I perform a nearest neighbour search on the dataset, using cosine
>>> similarity?
>>>
>>>   val df = spark.read.option("header", "false").csv("data")
>>>
>>>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>>
>>>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>>
>>>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>>
>>>   val df1 = tf.transform(tk.transform(df))
>>>
>>>   idf.fit(df1).transform(df1).select("tf-idf").show(10)
>>> Thank you
>>>
>>> --
>>> *Meeraj Kunnumpurath*
>>>
>>>
>>> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>>>
>>> *00 971 50 409 0169meeraj@servicesymphony.com
>>> <me...@servicesymphony.com>*
>>>
>>
>>
>>
>> --
>> *Meeraj Kunnumpurath*
>>
>>
>> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>>
>> *00 971 50 409 0169meeraj@servicesymphony.com
>> <me...@servicesymphony.com>*
>>
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>

Re: Nearest neighbour search

Posted by Meeraj Kunnumpurath <me...@servicesymphony.com>.
That was a bit of a brute force search, so I changed the code to use a UDF
to create the dot product between the two IDF vectors, and do a sort on the
new column.

package com.ss.ml.clustering

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
import org.apache.spark.ml.linalg.Vector

object ClusteringBasics extends App {

  val spark = SparkSession.builder().appName("Clustering
Basics").master("local").getOrCreate()
  import spark.implicits._

  val df = spark.read.option("header", "false").csv("data")

  val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
  val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
  val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")

  val df1 = tf.transform(tk.transform(df))
  val idfs = idf.fit(df1).transform(df1)

  val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", idfs)
  println(nn)

  def nearestNeighbour(uri: String, ds: DataFrame) : String = {
    val tfIdfSrc = ds.filter(s"_c0 ==
'$uri'").take(1)(0).getAs[Vector]("tf-idf")
    def dorProduct(vectorA: Vector) = {
      var dp = 0.0
      var index = vectorA.size - 1
      for (i <- 0 to index) {
        dp += vectorA(i) * tfIdfSrc(i)
      }
      dp
    }
    val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
    ds.filter(s"_c0 != '$uri'").withColumn("dp",
dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
  }

}


However, that is generating the exception below,

Exception in thread "main" java.lang.RuntimeException: Unsupported literal
type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
at
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.Column.$minus(Column.scala:672)
at
com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36)
at
com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
at
com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
meeraj@servicesymphony.com> wrote:

> This is what I have done, is there a better way of doing this?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>
>   val df1 = tf.transform(tk.transform(df))
>
>   val idfs = idf.fit(df1).transform(df1)
>
>
>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama",
> idfs))
>
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>
>     var res : Row = null
>
>     var metric : Double = 0
>
>     val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[
> Vector]("tf-idf")
>
>     ds.filter("_c0 != '" + uri + "'").foreach { r =>
>
>       val tfIdfDst = r.getAs[Vector]("tf-idf")
>
>       val dp = dorProduct(tfIdfSrc, tfIdfDst)
>
>       if (dp > metric) {
>
>         res = r
>
>         metric = dp
>
>       }
>
>     }
>
>     return res.getAs[String]("_c1")
>
>   }
>
>
>   def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>
>     var dotProduct = 0.0
>
>     var normA = 0.0
>
>     var normB = 0.0
>
>     var index = vectorA.size - 1
>
>     for (i <- 0 to index) {
>
>       dotProduct += vectorA(i) * vectorB(i)
>
>       normA += Math.pow(vectorA(i), 2)
>
>       normB += Math.pow(vectorB(i), 2)
>
>     }
>
>     (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)))
>
>   }
>
>
>   def dorProduct(vectorA: Vector, vectorB: Vector) = {
>
>     var dp = 0.0
>
>     var index = vectorA.size - 1
>
>     for (i <- 0 to index) {
>
>       dp += vectorA(i) * vectorB(i)
>
>     }
>
>     dp
>
>   }
>
> On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath <
> meeraj@servicesymphony.com> wrote:
>
>> Hello,
>>
>> I have a dataset containing TF-IDF vectors for a corpus of documents. How
>> do I perform a nearest neighbour search on the dataset, using cosine
>> similarity?
>>
>>   val df = spark.read.option("header", "false").csv("data")
>>
>>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>
>>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>
>>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>
>>   val df1 = tf.transform(tk.transform(df))
>>
>>   idf.fit(df1).transform(df1).select("tf-idf").show(10)
>> Thank you
>>
>> --
>> *Meeraj Kunnumpurath*
>>
>>
>> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>>
>> *00 971 50 409 0169meeraj@servicesymphony.com
>> <me...@servicesymphony.com>*
>>
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*

Re: Nearest neighbour search

Posted by Meeraj Kunnumpurath <me...@servicesymphony.com>.
This is what I have done, is there a better way of doing this?

  val df = spark.read.option("header", "false").csv("data")


  val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")

  val tf = new HashingTF().setInputCol("words").setOutputCol("tf")

  val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")


  val df1 = tf.transform(tk.transform(df))

  val idfs = idf.fit(df1).transform(df1)


  println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama",
idfs))


  def nearestNeighbour(uri: String, ds: DataFrame) : String = {

    var res : Row = null

    var metric : Double = 0

    val tfIdfSrc = ds.filter(s"_c0 ==
'$uri'").take(1)(0).getAs[Vector]("tf-idf")

    ds.filter("_c0 != '" + uri + "'").foreach { r =>

      val tfIdfDst = r.getAs[Vector]("tf-idf")

      val dp = dorProduct(tfIdfSrc, tfIdfDst)

      if (dp > metric) {

        res = r

        metric = dp

      }

    }

    return res.getAs[String]("_c1")

  }


  def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {

    var dotProduct = 0.0

    var normA = 0.0

    var normB = 0.0

    var index = vectorA.size - 1

    for (i <- 0 to index) {

      dotProduct += vectorA(i) * vectorB(i)

      normA += Math.pow(vectorA(i), 2)

      normB += Math.pow(vectorB(i), 2)

    }

    (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)))

  }


  def dorProduct(vectorA: Vector, vectorB: Vector) = {

    var dp = 0.0

    var index = vectorA.size - 1

    for (i <- 0 to index) {

      dp += vectorA(i) * vectorB(i)

    }

    dp

  }

On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath <
meeraj@servicesymphony.com> wrote:

> Hello,
>
> I have a dataset containing TF-IDF vectors for a corpus of documents. How
> do I perform a nearest neighbour search on the dataset, using cosine
> similarity?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>
>   idf.fit(df1).transform(df1).select("tf-idf").show(10)
> Thank you
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*
>



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169meeraj@servicesymphony.com <me...@servicesymphony.com>*