You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Carsten Schnober <sc...@ukp.informatik.tu-darmstadt.de> on 2015/07/10 12:02:10 UTC

K Nearest Neighbours

Hi,
I have the following problem, which is a kind of special case of k
nearest neighbours.
I have an Array of Vectors (v1) and an RDD[(Long, Vector)] of pairs of
vectors with indexes (v2). The array v1 easily fits into a single node's
memory (~100 entries), but v2 is very large (millions of entries).

My goal is to find for each vector in v1 the entries in v2 with least
distance. The naive solution would be to define a helper function that
computes all the distances between a vector from v1 and all vectors in
v2, sorts them, and returns the top n results:

def computeDistances(vector: Vector, vectors: RDD[(Long, Vector)],
n:Int=10): Seq[Long] =  {
    vectors.map { emb => (emb._1, Vectors.sqdist(emb._2, centroid)) }
      .sortBy(_._2) // sort by value
      .map(_._1) // retain indexes only
      .take(n)
}

So I can map the entries (after getting the indexes to keep track of the
mappings) in v1 to the distances:

v1.zipWithIndexes.map{ v => (computeDistances(v._1, v2), v._2) }

This gives me for each entry in v1 the indexes of the n closest entries
in v2.
However, as v1 is an array, the computeDistances() calls are all done
sequentially (on the driver, if I understand correctly) rather than
distributed.

The problem is that I must not convert v1 into an RDD because that will
result in an error due to nested RDD actions in computeDistance().

To conclude, what I would like to do (if it were possible) is this:

val v1: Seq[Vector] = ...
val v2: RDD[(Long, Vector)] = ...
sc.parallelize(v1).zipWithIndexes
  .map{ v => (computeDistances(v._1, v2), v._2) }


Is there any good practice to approach problems like this?
Thanks!
Carsten


-- 
Carsten Schnober
Doctoral Researcher
Ubiquitous Knowledge Processing (UKP) Lab
FB 20 / Computer Science Department
Technische Universität Darmstadt
Hochschulstr. 10, D-64289 Darmstadt, Germany
phone [+49] (0)6151 16-6227, fax -5455, room S2/02/B111
schnober@ukp.informatik.tu-darmstadt.de
www.ukp.tu-darmstadt.de

Web Research at TU Darmstadt (WeRC): www.werc.tu-darmstadt.de
GRK 1994: Adaptive Preparation of Information from Heterogeneous Sources
(AIPHES): www.aiphes.tu-darmstadt.de
PhD program: Knowledge Discovery in Scientific Literature (KDSL)
www.kdsl.tu-darmstadt.de




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: K Nearest Neighbours

Posted by Gylfi <gy...@berkeley.edu>.
Hi. 

What I would do in your case would be something like this.. 

Lets call the two datasets, qs and ds, where qs is an array of vectors and
ds is an RDD[(dsID: Long, Vector)]. 

Do the following: 
1) create a k-NN class that can keep track of the k-Nearest Neighbors so
far. It must have a qsID and some structure for the k nearest neighbors
Seq[(dsID:Long, Distance: Long)]  and the function .add( nn : (Long, Vector)
) that will do the distance calc and update the kNN when appropriate.  
2) collect the qs and key-it as well, so each qs has an ID, i.e. qs =
Array[(qsID : Long, Vector)]

Now what you want to do is not create all the distance stuff, but just the
k-NNs. To do this we will actually create a few k-NN for each query vector,
one for each partition, and then merge them later. 

3) do a ds.mapPartition() and inside the function you create a k-NN for the
each qs, scan the ds points of the partition and output an iterator pointing
to the set of k-NNs created. 
val k = 100
val qs = new Array[(KNNClass)]()
val ds = RDD[(Long, Vector)]() 
val knnResults = ds.mapPartitions( itr => {
      val knns = qs.map( qp =>  (qp._1, new KNNClass(k, qp) )
      itr.foreach( dp => {
        knns.foreach( knn => knn.add( dp ))
      } )
      knns.iterator
})

Now you have one k-NN per partition for each query point, but this we can
simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into
a single k-NN. 

val knnResultFinal = knnResults.reduceByKey( (a, b) => KNNClass.merge( a, b)
)

Where you have a static function that merges the two k-NNs, i.e. we simply
concatenate them and sort on distance, and then take the k top values and
returns them as a new knn class. 

If you want to control how many k-NNs are create you can always repartition
ds. 

How does that sound? Does this make any sense?   :) 

Regards, 
    Gylfi. 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org