You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Reinis Vicups <ma...@orbit-x.de> on 2014/08/06 01:50:59 UTC

RowSimilarityJob implementation with Spark

Hi,

we have had good results with RowSimilarityJob in our Use Case with some 
quality loss due to pruning and decline in performance if setting 
thresholds too high/low.

The current work on mahout integration with spark done by dlyubimov, 
pferrel and others is just amazing (although I would love to see more 
inline comments especially for classes like SparkEngine.scala or what 
the heck are those in org.apache.mahout.sparkbindings.blas :D )!

Either I haven't looked hard enough, or the RowSimilarityJob for spark 
is not implemented just yet (will it be at some point)? So my two 
colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate 
RowSimilarityJob (to some extent) in spark.

I am providing the very first and overly simplified version of 
implementation below and would greatly appreciate any feedback on 
whatever aspect of out implementation you would like to comment on!

Some fun facts:
We have roughly 10k support tickets containing textual information 
(title, description, comments of participating agents, solutions, 
attachment texts that we are also extracting). The raw data we import 
from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data 
is roughly 80 MB and contains 10k vectors with the dimensionality of 
roughly 300k.

The job is executed on our test cluster consisting of dual core machines 
with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will 
see in implementation, currently we do no pruning, no limitations of 
observations, no thresholds - so it runs on whole corpus. And completes 
in 18 to 22 minutes. We have observed some non-linear horizontal 
scalability (started with one node, then three, then four and the 
execution time reduced slightly). The mahout's RowSimilarityJob with 
maxed out observations completes in 12 minutes for the same data set.

"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"

Thanks, guys, for your time reading this and, again, any feedback 
(especially on how to improve the implementation ;) ) is greatly appreciated
reinis

---
The code:

import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}

import scala.annotation.tailrec
import scala.collection.LinearSeq

object RowSimilaritySparkJob {
   def main(args: Array[String]) {
       val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
       val sc = new SparkContext(sConf)

       @transient val hConf = HBaseConfiguration.create()
       val fullTableName = "sparsevectortfidfvectors"
       hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)

       val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])

       val documentRows = rdd.map { get =>
         // read-in tfidf from HBase
         val vectorWritable = new VectorWritable()
         val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
         val vector = vectorWritable.get()

         // output tuple (documentId, tfidfVector)
         (bytesToInt(get._1.get()), vector)
       }

       /* Stage 1 : normalize & transpose */
       val termRows = documentRows

          // we do no normalization since we use cosine similarity and 
seq2sparse already generates normalized tfidf values
         // write in what documents the given termid appears in 
(term-document co-occurrence) as a tuple (termid, (documentid, tfidf of 
term in document))
         .flatMap { case(documentId, tfidfVector) =>
           import scala.collection.JavaConversions._
           for( element <- tfidfVector.nonZeroes() ) yield {
             // output multiple tuples (termid, (documentid, tfidf of 
term in document))
             element.index() -> (documentId -> element.get())
           }
         }

         // combine term-document co-occurrence fragment by merging the 
vectors and thus creating
         // full vector (or in our case linear sequence) of documents 
the termid appears in
         // (termid -> Seq((documentId -> term-tfidf in document), 
(anotherDocumentId -> term-tfidf in that document), ...))
         .combineByKey[LinearSeq[(Int, Double)]](
           (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
           (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
           (combiner: LinearSeq[(Int, Double)], combinee: 
LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating 
Seq's from different workers?},
           40 // this is just number of partitions depending on 
available hardware (cpus))

         /* Stage 2 : co-occurrence triangular matrix */
         // write upper half of the document co-occurrence matrix 
(triangular matrix) one matrix per termid.
         // we take vectors (sequences) generated in the previous step 
and traverse vector dimensions (containing tuple documentid -> tfidf)
         // building triangular matrix elements (rowid -> (colid -> 
row-col value)). Here the rowid and colid are documentids from the vector
         // of the term and row-col value is term-tfidf of both 
documents multiplied with each other (in our case cosine similarity).
         // result is sequence of tuples (leftDocumentId -> 
LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), 
(anotherRightDocumentId -> left-tfidf * right-tfidf))
         // representing rows of upper triangular matrix, one per termid
         .flatMap { x: (Int, LinearSeq[(Int, Double)]) => 
triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }

         // tuples from the previous step are reduced by key 
(leftDocumentId, rightDocumentId) over all termid matrixes.
         // since most of the documents contain multiple terms, there 
will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
         // reduce all leftDocumentId, rightDocuemtnId co-occurrences 
over all common terms (it is ensured through reducing by key) summing 
the similarities up
         // this results into sequence representing triangular matrix 
aggregated by (leftDocumentId, rightDocumentId):
         // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), 
((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
         .reduceByKey(_ + _, 40)

         /* Stage 3 : create similarities */
         // symetrify document co-occurrence matrix by generating lower 
triangular matrix from the upper triangular matrix through
         // swapping left document id and right document id (original 
(leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
         .flatMap { x: ((Int, Int), Double) =>
             LinearSeq(x, ((x._1._2, x._1._1), x._2))
         }

         // save in hadoop
         .saveAsTextFile("row-similarity-test")
   }

   /**
    * Produces LinearSeq representing triangular matrix of document 
co-occurrences
    * @param accumulator is a temporary variable used by recursion 
(enables tail-recursion)
    * @param corpus shall contain at least two elements (2 document 
occurrences), otherwise an empty Seq will be returned
    * @return Seq of document co-occurrences formated as 
((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
    */
   @tailrec private def triangularCoOccurrenceMatrix(accumulator: 
LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): 
LinearSeq[((Int, Int), Double)] = corpus match {
     case h +: Seq() => accumulator
     case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for 
(e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
   }
}

Re: RowSimilarityJob implementation with Spark

Posted by Ted Dunning <te...@gmail.com>.
On Thu, Aug 7, 2014 at 3:22 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

> During my tests I observed that there were always 2-3-4 long running tasks
> that determined the critical path of the whole spark job (as in, there was
> one task running for whole 18 minutes). Also I observed that only through
> increasing number of partitions those long running tasks got shorter! So I
> was increasing gradually number of partitions and at 400 partitions it
> finally rocked.
>

This is likely due to skew in the statistics of different items.

As you increase the size of the data, you may see better balance because
the maximum frequency limit will kick in more often.

Re: RowSimilarityJob implementation with Spark

Posted by Reinis Vicups <ma...@orbit-x.de>.
Ok, I did a number of re-factorings and one among them, that blew my mind.

As you may or may not know this:

.reduceByKey(_ + _, 40) // do it with 40 partitions

cause spark to partition data into 40 parts with default, as I 
understand, hash-based partitioner. Then during execution these 
partitions get processed by separate tasks that in turn are distributed 
and spawned (in parallel) by available workers (nodes).

During my tests I observed that there were always 2-3-4 long running 
tasks that determined the critical path of the whole spark job (as in, 
there was one task running for whole 18 minutes). Also I observed that 
only through increasing number of partitions those long running tasks 
got shorter! So I was increasing gradually number of partitions and at 
400 partitions it finally rocked.

At the moment the very same job that previously ran for 18-22 minutes 
now is complete in under 3!

I don't know much about spark scheduling and what overhead there is for 
spawning and despawning tasks but what I observe in my tests look as if
- tasks are queued on all available worker nodes by simple round robin;
- queued tasks get executed up to 100% cpu load, others wait until load 
drops and then get spawned (I am not sure but it looks as if each task 
requires more or less equal portion of cpu power);
- spawn/despawn overhead is really small (I read someplace that its 
about 200ms per task);
- default partitioner is not that smart but even with brute force (as 
in, just increase partitions to some large number) it manages to make 
small enough partitions.

So the combo of large number of partitions and, if possible, smart 
partitioner implementation is doing the trick in the case of RSJ.

I will be testing the job with much larger data set (328k "fat" 
documents instead of 10k) and will report on findings.

kind regards
reinis

P.S. Pat, thank you so much for the link to docs, no idea how the heck I 
oversaw them!

On 06.08.2014 19:13, Pat Ferrel wrote:
> I ran itemsimilarity on the epinions data on a small 3 machine cluster and found that it ran in 20 some minutes using the old hadoop version, while the spark-itemsimilarity ran in 2+ minutes so it looks like 10x the performance. This is using the same “cheats” in both cases.
>
> 100x is not likely to be seen except in special cases that fit hadoop’s mapreduce poorly and Spark’s much better. For example if a pipeline is long, requiring lots of serialization and IO on hadoop and none using Spark’s in-memory RDDs. I doubt we’ll see that with RSJ, which is fairly simple.
>
> Other comments inline
>
>> On Aug 5, 2014, at 9:38 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>> Yes, that would make useage of threshold and the expected(?) quality of the result better.
> It may also have some of the same side effects as the current threshold, like having no similar items for some. But it does produce an average “quality” better than other sub-sampling techniques.
>
>> The configuration tho was not the main point for us to try to implement RowSimilarityJob with spark, but rather
>> - promise of spark to work 100x times faster than hadoops map-reduce,
>> - existence of "cheats" in current RowSimilarityJob (observations, threshold, max similarities per row), we are willing to have a super performant "loss less" solution.
>>
>> With our initial implementation we observe that spark implementation is performing worse than original RSJ and I was wondering if community could hint us on why so.
>>
>> I am making couple of guesses myself:
>> - we're not using any checkpointing, caching, sharing, broadcasting of intermediate results and I am kinda unsure if that is applicable to RSJ;
> It does apply, Mahout uses caching to keep some data in memory and there is an optimizer (BLAS type) build in to the DRM math calculations. Scala supports lazy evaluation, which allows the optimizer to work invisibly in the background. This means a “slim” A’A to seed the item similarity calc.
>
>> - we're not using kryoserializer, possibly that could have some reasonable impact on performance;
> We are using kryo. Can’t speak to the speed comparison but maybe others can.
>
>> - we're using only standard scala collections. I did use breeze.linalg SparseVector at some point but I couldn't observe any performance increase. Do you guys have any experience on using specialized linalg collections versus LinearSeq of scala versus IndexedSeq of scala?
> Mahout uses Scala extensions and conversions to make Scala style use of the Mahout Java optimized SparseVector types and iterators. The DRM has been completely implemented for Spark and has some optimizations. One recent optimization was in the way non-zero elements of a sparse vector are iterated.
>
> If you’d like to use any of this I suggest reading the docs on the Mahout Scala DSL here: http://mahout.apache.org/users/sparkbindings/home.html and the full PDF here: http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf
>
> You can even fire up the Spark/Scala shell and play with your code interactively.
>
>> thanks
>> reinis
>>
>> On 06.08.2014 03:04, Pat Ferrel wrote:
>>> Cooccurrence is implemented on Spark and a ticket for doing the RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity is implemented. Will use only LLR for now. The driver reads text files (a text version of DRM).
>>>
>>> If you want to wrap Spark cooccurrence yourself you can read in a DRM from the Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try any of this let me know and I’ll give some pointers.
>>>
>>> I’ve been wondering about the threshold too. Seems like an absolute threshold is very hard to use. Was thinking that some % of values might be a better measure. Imagine you want 100 similar items per item, so 100 per row in the similarity matrix—set 100 per row as your threshold in this future Spark-RSJ. But you’d get 100 on average, it wouldn’t be guaranteed for any particular item. Over all it would be 100*number of rows that you’d get and they’d be the items with highest similarity scores of the total created. Do you think this would help?
>>>
>>> On Aug 5, 2014, at 4:50 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>>
>>> Hi,
>>>
>>> we have had good results with RowSimilarityJob in our Use Case with some quality loss due to pruning and decline in performance if setting thresholds too high/low.
>>>
>>> The current work on mahout integration with spark done by dlyubimov, pferrel and others is just amazing (although I would love to see more inline comments especially for classes like SparkEngine.scala or what the heck are those in org.apache.mahout.sparkbindings.blas :D )!
>>>
>>> Either I haven't looked hard enough, or the RowSimilarityJob for spark is not implemented just yet (will it be at some point)? So my two colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some extent) in spark.
>>>
>>> I am providing the very first and overly simplified version of implementation below and would greatly appreciate any feedback on whatever aspect of out implementation you would like to comment on!
>>>
>>> Some fun facts:
>>> We have roughly 10k support tickets containing textual information (title, description, comments of participating agents, solutions, attachment texts that we are also extracting). The raw data we import from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k vectors with the dimensionality of roughly 300k.
>>>
>>> The job is executed on our test cluster consisting of dual core machines with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in implementation, currently we do no pruning, no limitations of observations, no thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We have observed some non-linear horizontal scalability (started with one node, then three, then four and the execution time reduced slightly). The mahout's RowSimilarityJob with maxed out observations completes in 12 minutes for the same data set.
>>>
>>> "--similarityClassname", "SIMILARITY_COSINE",
>>> "-m", "100000",
>>> "--maxObservationsPerRow", "900000",
>>> "--maxObservationsPerColumn", "900000",
>>> "-ess", "true"
>>>
>>> Thanks, guys, for your time reading this and, again, any feedback (especially on how to improve the implementation ;) ) is greatly appreciated
>>> reinis
>>>
>>> ---
>>> The code:
>>>
>>> import com.google.common.io.ByteStreams
>>> import our.hbase.HBaseConversions._
>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>> import org.apache.hadoop.hbase.client.Result
>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>> import org.apache.hadoop.hbase.util.Bytes
>>> import org.apache.mahout.math.VectorWritable
>>> import org.apache.spark.SparkContext.rddToPairRDDFunctions
>>> import org.apache.spark.{SparkConf, SparkContext}
>>>
>>> import scala.annotation.tailrec
>>> import scala.collection.LinearSeq
>>>
>>> object RowSimilaritySparkJob {
>>> def main(args: Array[String]) {
>>>     val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
>>>     val sc = new SparkContext(sConf)
>>>
>>>     @transient val hConf = HBaseConfiguration.create()
>>>     val fullTableName = "sparsevectortfidfvectors"
>>>     hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
>>>
>>>     val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
>>>
>>>     val documentRows = rdd.map { get =>
>>>       // read-in tfidf from HBase
>>>       val vectorWritable = new VectorWritable()
>>>       val vectorByteArray = get._2.getValue("d", "vector")
>>> vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
>>>       val vector = vectorWritable.get()
>>>
>>>       // output tuple (documentId, tfidfVector)
>>>       (bytesToInt(get._1.get()), vector)
>>>     }
>>>
>>>     /* Stage 1 : normalize & transpose */
>>>     val termRows = documentRows
>>>
>>>        // we do no normalization since we use cosine similarity and seq2sparse already generates normalized tfidf values
>>>       // write in what documents the given termid appears in (term-document co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
>>>       .flatMap { case(documentId, tfidfVector) =>
>>>         import scala.collection.JavaConversions._
>>>         for( element <- tfidfVector.nonZeroes() ) yield {
>>>           // output multiple tuples (termid, (documentid, tfidf of term in document))
>>>           element.index() -> (documentId -> element.get())
>>>         }
>>>       }
>>>
>>>       // combine term-document co-occurrence fragment by merging the vectors and thus creating
>>>       // full vector (or in our case linear sequence) of documents the termid appears in
>>>       // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId -> term-tfidf in that document), ...))
>>>       .combineByKey[LinearSeq[(Int, Double)]](
>>>         (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
>>>         (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
>>>         (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating Seq's from different workers?},
>>>         40 // this is just number of partitions depending on available hardware (cpus))
>>>
>>>       /* Stage 2 : co-occurrence triangular matrix */
>>>       // write upper half of the document co-occurrence matrix (triangular matrix) one matrix per termid.
>>>       // we take vectors (sequences) generated in the previous step and traverse vector dimensions (containing tuple documentid -> tfidf)
>>>       // building triangular matrix elements (rowid -> (colid -> row-col value)). Here the rowid and colid are documentids from the vector
>>>       // of the term and row-col value is term-tfidf of both documents multiplied with each other (in our case cosine similarity).
>>>       // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
>>>       // representing rows of upper triangular matrix, one per termid
>>>       .flatMap { x: (Int, LinearSeq[(Int, Double)]) => triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
>>>
>>>       // tuples from the previous step are reduced by key (leftDocumentId, rightDocumentId) over all termid matrixes.
>>>       // since most of the documents contain multiple terms, there will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
>>>       // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all common terms (it is ensured through reducing by key) summing the similarities up
>>>       // this results into sequence representing triangular matrix aggregated by (leftDocumentId, rightDocumentId):
>>>       // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
>>>       .reduceByKey(_ + _, 40)
>>>
>>>       /* Stage 3 : create similarities */
>>>       // symetrify document co-occurrence matrix by generating lower triangular matrix from the upper triangular matrix through
>>>       // swapping left document id and right document id (original (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
>>>       .flatMap { x: ((Int, Int), Double) =>
>>>           LinearSeq(x, ((x._1._2, x._1._1), x._2))
>>>       }
>>>
>>>       // save in hadoop
>>>       .saveAsTextFile("row-similarity-test")
>>> }
>>>
>>> /**
>>> * Produces LinearSeq representing triangular matrix of document co-occurrences
>>> * @param accumulator is a temporary variable used by recursion (enables tail-recursion)
>>> * @param corpus shall contain at least two elements (2 document occurrences), otherwise an empty Seq will be returned
>>> * @return Seq of document co-occurrences formated as ((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
>>> */
>>> @tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), Double)] = corpus match {
>>>   case h +: Seq() => accumulator
>>>   case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
>>> }
>>> }
>>


Re: RowSimilarityJob implementation with Spark

Posted by Pat Ferrel <pa...@occamsmachete.com>.
I ran itemsimilarity on the epinions data on a small 3 machine cluster and found that it ran in 20 some minutes using the old hadoop version, while the spark-itemsimilarity ran in 2+ minutes so it looks like 10x the performance. This is using the same “cheats” in both cases. 

100x is not likely to be seen except in special cases that fit hadoop’s mapreduce poorly and Spark’s much better. For example if a pipeline is long, requiring lots of serialization and IO on hadoop and none using Spark’s in-memory RDDs. I doubt we’ll see that with RSJ, which is fairly simple.

Other comments inline

> On Aug 5, 2014, at 9:38 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
> 
> Yes, that would make useage of threshold and the expected(?) quality of the result better.

It may also have some of the same side effects as the current threshold, like having no similar items for some. But it does produce an average “quality” better than other sub-sampling techniques.

> 
> The configuration tho was not the main point for us to try to implement RowSimilarityJob with spark, but rather
> - promise of spark to work 100x times faster than hadoops map-reduce,
> - existence of "cheats" in current RowSimilarityJob (observations, threshold, max similarities per row), we are willing to have a super performant "loss less" solution.
> 
> With our initial implementation we observe that spark implementation is performing worse than original RSJ and I was wondering if community could hint us on why so.
> 
> I am making couple of guesses myself:
> - we're not using any checkpointing, caching, sharing, broadcasting of intermediate results and I am kinda unsure if that is applicable to RSJ;

It does apply, Mahout uses caching to keep some data in memory and there is an optimizer (BLAS type) build in to the DRM math calculations. Scala supports lazy evaluation, which allows the optimizer to work invisibly in the background. This means a “slim” A’A to seed the item similarity calc.

> - we're not using kryoserializer, possibly that could have some reasonable impact on performance;

We are using kryo. Can’t speak to the speed comparison but maybe others can. 

> - we're using only standard scala collections. I did use breeze.linalg SparseVector at some point but I couldn't observe any performance increase. Do you guys have any experience on using specialized linalg collections versus LinearSeq of scala versus IndexedSeq of scala?

Mahout uses Scala extensions and conversions to make Scala style use of the Mahout Java optimized SparseVector types and iterators. The DRM has been completely implemented for Spark and has some optimizations. One recent optimization was in the way non-zero elements of a sparse vector are iterated.

If you’d like to use any of this I suggest reading the docs on the Mahout Scala DSL here: http://mahout.apache.org/users/sparkbindings/home.html and the full PDF here: http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf

You can even fire up the Spark/Scala shell and play with your code interactively.

> 
> thanks
> reinis
> 
> On 06.08.2014 03:04, Pat Ferrel wrote:
>> Cooccurrence is implemented on Spark and a ticket for doing the RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity is implemented. Will use only LLR for now. The driver reads text files (a text version of DRM).
>> 
>> If you want to wrap Spark cooccurrence yourself you can read in a DRM from the Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try any of this let me know and I’ll give some pointers.
>> 
>> I’ve been wondering about the threshold too. Seems like an absolute threshold is very hard to use. Was thinking that some % of values might be a better measure. Imagine you want 100 similar items per item, so 100 per row in the similarity matrix—set 100 per row as your threshold in this future Spark-RSJ. But you’d get 100 on average, it wouldn’t be guaranteed for any particular item. Over all it would be 100*number of rows that you’d get and they’d be the items with highest similarity scores of the total created. Do you think this would help?
>> 
>> On Aug 5, 2014, at 4:50 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>> 
>> Hi,
>> 
>> we have had good results with RowSimilarityJob in our Use Case with some quality loss due to pruning and decline in performance if setting thresholds too high/low.
>> 
>> The current work on mahout integration with spark done by dlyubimov, pferrel and others is just amazing (although I would love to see more inline comments especially for classes like SparkEngine.scala or what the heck are those in org.apache.mahout.sparkbindings.blas :D )!
>> 
>> Either I haven't looked hard enough, or the RowSimilarityJob for spark is not implemented just yet (will it be at some point)? So my two colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some extent) in spark.
>> 
>> I am providing the very first and overly simplified version of implementation below and would greatly appreciate any feedback on whatever aspect of out implementation you would like to comment on!
>> 
>> Some fun facts:
>> We have roughly 10k support tickets containing textual information (title, description, comments of participating agents, solutions, attachment texts that we are also extracting). The raw data we import from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k vectors with the dimensionality of roughly 300k.
>> 
>> The job is executed on our test cluster consisting of dual core machines with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in implementation, currently we do no pruning, no limitations of observations, no thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We have observed some non-linear horizontal scalability (started with one node, then three, then four and the execution time reduced slightly). The mahout's RowSimilarityJob with maxed out observations completes in 12 minutes for the same data set.
>> 
>> "--similarityClassname", "SIMILARITY_COSINE",
>> "-m", "100000",
>> "--maxObservationsPerRow", "900000",
>> "--maxObservationsPerColumn", "900000",
>> "-ess", "true"
>> 
>> Thanks, guys, for your time reading this and, again, any feedback (especially on how to improve the implementation ;) ) is greatly appreciated
>> reinis
>> 
>> ---
>> The code:
>> 
>> import com.google.common.io.ByteStreams
>> import our.hbase.HBaseConversions._
>> import org.apache.hadoop.hbase.HBaseConfiguration
>> import org.apache.hadoop.hbase.client.Result
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>> import org.apache.hadoop.hbase.util.Bytes
>> import org.apache.mahout.math.VectorWritable
>> import org.apache.spark.SparkContext.rddToPairRDDFunctions
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> import scala.annotation.tailrec
>> import scala.collection.LinearSeq
>> 
>> object RowSimilaritySparkJob {
>> def main(args: Array[String]) {
>>    val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
>>    val sc = new SparkContext(sConf)
>> 
>>    @transient val hConf = HBaseConfiguration.create()
>>    val fullTableName = "sparsevectortfidfvectors"
>>    hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
>> 
>>    val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
>> 
>>    val documentRows = rdd.map { get =>
>>      // read-in tfidf from HBase
>>      val vectorWritable = new VectorWritable()
>>      val vectorByteArray = get._2.getValue("d", "vector")
>> vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
>>      val vector = vectorWritable.get()
>> 
>>      // output tuple (documentId, tfidfVector)
>>      (bytesToInt(get._1.get()), vector)
>>    }
>> 
>>    /* Stage 1 : normalize & transpose */
>>    val termRows = documentRows
>> 
>>       // we do no normalization since we use cosine similarity and seq2sparse already generates normalized tfidf values
>>      // write in what documents the given termid appears in (term-document co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
>>      .flatMap { case(documentId, tfidfVector) =>
>>        import scala.collection.JavaConversions._
>>        for( element <- tfidfVector.nonZeroes() ) yield {
>>          // output multiple tuples (termid, (documentid, tfidf of term in document))
>>          element.index() -> (documentId -> element.get())
>>        }
>>      }
>> 
>>      // combine term-document co-occurrence fragment by merging the vectors and thus creating
>>      // full vector (or in our case linear sequence) of documents the termid appears in
>>      // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId -> term-tfidf in that document), ...))
>>      .combineByKey[LinearSeq[(Int, Double)]](
>>        (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
>>        (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
>>        (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating Seq's from different workers?},
>>        40 // this is just number of partitions depending on available hardware (cpus))
>> 
>>      /* Stage 2 : co-occurrence triangular matrix */
>>      // write upper half of the document co-occurrence matrix (triangular matrix) one matrix per termid.
>>      // we take vectors (sequences) generated in the previous step and traverse vector dimensions (containing tuple documentid -> tfidf)
>>      // building triangular matrix elements (rowid -> (colid -> row-col value)). Here the rowid and colid are documentids from the vector
>>      // of the term and row-col value is term-tfidf of both documents multiplied with each other (in our case cosine similarity).
>>      // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
>>      // representing rows of upper triangular matrix, one per termid
>>      .flatMap { x: (Int, LinearSeq[(Int, Double)]) => triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
>> 
>>      // tuples from the previous step are reduced by key (leftDocumentId, rightDocumentId) over all termid matrixes.
>>      // since most of the documents contain multiple terms, there will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
>>      // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all common terms (it is ensured through reducing by key) summing the similarities up
>>      // this results into sequence representing triangular matrix aggregated by (leftDocumentId, rightDocumentId):
>>      // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
>>      .reduceByKey(_ + _, 40)
>> 
>>      /* Stage 3 : create similarities */
>>      // symetrify document co-occurrence matrix by generating lower triangular matrix from the upper triangular matrix through
>>      // swapping left document id and right document id (original (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
>>      .flatMap { x: ((Int, Int), Double) =>
>>          LinearSeq(x, ((x._1._2, x._1._1), x._2))
>>      }
>> 
>>      // save in hadoop
>>      .saveAsTextFile("row-similarity-test")
>> }
>> 
>> /**
>> * Produces LinearSeq representing triangular matrix of document co-occurrences
>> * @param accumulator is a temporary variable used by recursion (enables tail-recursion)
>> * @param corpus shall contain at least two elements (2 document occurrences), otherwise an empty Seq will be returned
>> * @return Seq of document co-occurrences formated as ((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
>> */
>> @tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), Double)] = corpus match {
>>  case h +: Seq() => accumulator
>>  case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
>> }
>> }
> 
> 

Re: RowSimilarityJob implementation with Spark

Posted by Reinis Vicups <ma...@orbit-x.de>.
Yes, that would make useage of threshold and the expected(?) quality of 
the result better.

The configuration tho was not the main point for us to try to implement 
RowSimilarityJob with spark, but rather
- promise of spark to work 100x times faster than hadoops map-reduce,
- existence of "cheats" in current RowSimilarityJob (observations, 
threshold, max similarities per row), we are willing to have a super 
performant "loss less" solution.

With our initial implementation we observe that spark implementation is 
performing worse than original RSJ and I was wondering if community 
could hint us on why so.

I am making couple of guesses myself:
- we're not using any checkpointing, caching, sharing, broadcasting of 
intermediate results and I am kinda unsure if that is applicable to RSJ;
- we're not using kryoserializer, possibly that could have some 
reasonable impact on performance;
- we're using only standard scala collections. I did use breeze.linalg 
SparseVector at some point but I couldn't observe any performance 
increase. Do you guys have any experience on using specialized linalg 
collections versus LinearSeq of scala versus IndexedSeq of scala?

thanks
reinis

On 06.08.2014 03:04, Pat Ferrel wrote:
> Cooccurrence is implemented on Spark and a ticket for doing the RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity is implemented. Will use only LLR for now. The driver reads text files (a text version of DRM).
>
> If you want to wrap Spark cooccurrence yourself you can read in a DRM from the Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try any of this let me know and I’ll give some pointers.
>
> I’ve been wondering about the threshold too. Seems like an absolute threshold is very hard to use. Was thinking that some % of values might be a better measure. Imagine you want 100 similar items per item, so 100 per row in the similarity matrix—set 100 per row as your threshold in this future Spark-RSJ. But you’d get 100 on average, it wouldn’t be guaranteed for any particular item. Over all it would be 100*number of rows that you’d get and they’d be the items with highest similarity scores of the total created. Do you think this would help?
>
> On Aug 5, 2014, at 4:50 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Hi,
>
> we have had good results with RowSimilarityJob in our Use Case with some quality loss due to pruning and decline in performance if setting thresholds too high/low.
>
> The current work on mahout integration with spark done by dlyubimov, pferrel and others is just amazing (although I would love to see more inline comments especially for classes like SparkEngine.scala or what the heck are those in org.apache.mahout.sparkbindings.blas :D )!
>
> Either I haven't looked hard enough, or the RowSimilarityJob for spark is not implemented just yet (will it be at some point)? So my two colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some extent) in spark.
>
> I am providing the very first and overly simplified version of implementation below and would greatly appreciate any feedback on whatever aspect of out implementation you would like to comment on!
>
> Some fun facts:
> We have roughly 10k support tickets containing textual information (title, description, comments of participating agents, solutions, attachment texts that we are also extracting). The raw data we import from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k vectors with the dimensionality of roughly 300k.
>
> The job is executed on our test cluster consisting of dual core machines with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in implementation, currently we do no pruning, no limitations of observations, no thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We have observed some non-linear horizontal scalability (started with one node, then three, then four and the execution time reduced slightly). The mahout's RowSimilarityJob with maxed out observations completes in 12 minutes for the same data set.
>
> "--similarityClassname", "SIMILARITY_COSINE",
> "-m", "100000",
> "--maxObservationsPerRow", "900000",
> "--maxObservationsPerColumn", "900000",
> "-ess", "true"
>
> Thanks, guys, for your time reading this and, again, any feedback (especially on how to improve the implementation ;) ) is greatly appreciated
> reinis
>
> ---
> The code:
>
> import com.google.common.io.ByteStreams
> import our.hbase.HBaseConversions._
> import org.apache.hadoop.hbase.HBaseConfiguration
> import org.apache.hadoop.hbase.client.Result
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.mahout.math.VectorWritable
> import org.apache.spark.SparkContext.rddToPairRDDFunctions
> import org.apache.spark.{SparkConf, SparkContext}
>
> import scala.annotation.tailrec
> import scala.collection.LinearSeq
>
> object RowSimilaritySparkJob {
>   def main(args: Array[String]) {
>       val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
>       val sc = new SparkContext(sConf)
>
>       @transient val hConf = HBaseConfiguration.create()
>       val fullTableName = "sparsevectortfidfvectors"
>       hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
>
>       val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
>
>       val documentRows = rdd.map { get =>
>         // read-in tfidf from HBase
>         val vectorWritable = new VectorWritable()
>         val vectorByteArray = get._2.getValue("d", "vector")
> vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
>         val vector = vectorWritable.get()
>
>         // output tuple (documentId, tfidfVector)
>         (bytesToInt(get._1.get()), vector)
>       }
>
>       /* Stage 1 : normalize & transpose */
>       val termRows = documentRows
>
>          // we do no normalization since we use cosine similarity and seq2sparse already generates normalized tfidf values
>         // write in what documents the given termid appears in (term-document co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
>         .flatMap { case(documentId, tfidfVector) =>
>           import scala.collection.JavaConversions._
>           for( element <- tfidfVector.nonZeroes() ) yield {
>             // output multiple tuples (termid, (documentid, tfidf of term in document))
>             element.index() -> (documentId -> element.get())
>           }
>         }
>
>         // combine term-document co-occurrence fragment by merging the vectors and thus creating
>         // full vector (or in our case linear sequence) of documents the termid appears in
>         // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId -> term-tfidf in that document), ...))
>         .combineByKey[LinearSeq[(Int, Double)]](
>           (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
>           (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
>           (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating Seq's from different workers?},
>           40 // this is just number of partitions depending on available hardware (cpus))
>
>         /* Stage 2 : co-occurrence triangular matrix */
>         // write upper half of the document co-occurrence matrix (triangular matrix) one matrix per termid.
>         // we take vectors (sequences) generated in the previous step and traverse vector dimensions (containing tuple documentid -> tfidf)
>         // building triangular matrix elements (rowid -> (colid -> row-col value)). Here the rowid and colid are documentids from the vector
>         // of the term and row-col value is term-tfidf of both documents multiplied with each other (in our case cosine similarity).
>         // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
>         // representing rows of upper triangular matrix, one per termid
>         .flatMap { x: (Int, LinearSeq[(Int, Double)]) => triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
>
>         // tuples from the previous step are reduced by key (leftDocumentId, rightDocumentId) over all termid matrixes.
>         // since most of the documents contain multiple terms, there will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
>         // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all common terms (it is ensured through reducing by key) summing the similarities up
>         // this results into sequence representing triangular matrix aggregated by (leftDocumentId, rightDocumentId):
>         // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
>         .reduceByKey(_ + _, 40)
>
>         /* Stage 3 : create similarities */
>         // symetrify document co-occurrence matrix by generating lower triangular matrix from the upper triangular matrix through
>         // swapping left document id and right document id (original (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
>         .flatMap { x: ((Int, Int), Double) =>
>             LinearSeq(x, ((x._1._2, x._1._1), x._2))
>         }
>
>         // save in hadoop
>         .saveAsTextFile("row-similarity-test")
>   }
>
>   /**
>    * Produces LinearSeq representing triangular matrix of document co-occurrences
>    * @param accumulator is a temporary variable used by recursion (enables tail-recursion)
>    * @param corpus shall contain at least two elements (2 document occurrences), otherwise an empty Seq will be returned
>    * @return Seq of document co-occurrences formated as ((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
>    */
>   @tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), Double)] = corpus match {
>     case h +: Seq() => accumulator
>     case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
>   }
> }


Re: RowSimilarityJob implementation with Spark

Posted by Pat Ferrel <pa...@gmail.com>.
Cooccurrence is implemented on Spark and a ticket for doing the RowSimilarityJob was entered today. Should be fairly easy since ItemSimilarity is implemented. Will use only LLR for now. The driver reads text files (a text version of DRM). 

If you want to wrap Spark cooccurrence yourself you can read in a DRM from the Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want to try any of this let me know and I’ll give some pointers.

I’ve been wondering about the threshold too. Seems like an absolute threshold is very hard to use. Was thinking that some % of values might be a better measure. Imagine you want 100 similar items per item, so 100 per row in the similarity matrix—set 100 per row as your threshold in this future Spark-RSJ. But you’d get 100 on average, it wouldn’t be guaranteed for any particular item. Over all it would be 100*number of rows that you’d get and they’d be the items with highest similarity scores of the total created. Do you think this would help?

On Aug 5, 2014, at 4:50 PM, Reinis Vicups <ma...@orbit-x.de> wrote:

Hi,

we have had good results with RowSimilarityJob in our Use Case with some quality loss due to pruning and decline in performance if setting thresholds too high/low.

The current work on mahout integration with spark done by dlyubimov, pferrel and others is just amazing (although I would love to see more inline comments especially for classes like SparkEngine.scala or what the heck are those in org.apache.mahout.sparkbindings.blas :D )!

Either I haven't looked hard enough, or the RowSimilarityJob for spark is not implemented just yet (will it be at some point)? So my two colleagues (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to some extent) in spark.

I am providing the very first and overly simplified version of implementation below and would greatly appreciate any feedback on whatever aspect of out implementation you would like to comment on!

Some fun facts:
We have roughly 10k support tickets containing textual information (title, description, comments of participating agents, solutions, attachment texts that we are also extracting). The raw data we import from relational DB into HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and contains 10k vectors with the dimensionality of roughly 300k.

The job is executed on our test cluster consisting of dual core machines with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see in implementation, currently we do no pruning, no limitations of observations, no thresholds - so it runs on whole corpus. And completes in 18 to 22 minutes. We have observed some non-linear horizontal scalability (started with one node, then three, then four and the execution time reduced slightly). The mahout's RowSimilarityJob with maxed out observations completes in 12 minutes for the same data set.

"--similarityClassname", "SIMILARITY_COSINE",
"-m", "100000",
"--maxObservationsPerRow", "900000",
"--maxObservationsPerColumn", "900000",
"-ess", "true"

Thanks, guys, for your time reading this and, again, any feedback (especially on how to improve the implementation ;) ) is greatly appreciated
reinis

---
The code:

import com.google.common.io.ByteStreams
import our.hbase.HBaseConversions._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.{SparkConf, SparkContext}

import scala.annotation.tailrec
import scala.collection.LinearSeq

object RowSimilaritySparkJob {
 def main(args: Array[String]) {
     val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
     val sc = new SparkContext(sConf)

     @transient val hConf = HBaseConfiguration.create()
     val fullTableName = "sparsevectortfidfvectors"
     hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)

     val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

     val documentRows = rdd.map { get =>
       // read-in tfidf from HBase
       val vectorWritable = new VectorWritable()
       val vectorByteArray = get._2.getValue("d", "vector")
vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
       val vector = vectorWritable.get()

       // output tuple (documentId, tfidfVector)
       (bytesToInt(get._1.get()), vector)
     }

     /* Stage 1 : normalize & transpose */
     val termRows = documentRows

        // we do no normalization since we use cosine similarity and seq2sparse already generates normalized tfidf values
       // write in what documents the given termid appears in (term-document co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
       .flatMap { case(documentId, tfidfVector) =>
         import scala.collection.JavaConversions._
         for( element <- tfidfVector.nonZeroes() ) yield {
           // output multiple tuples (termid, (documentid, tfidf of term in document))
           element.index() -> (documentId -> element.get())
         }
       }

       // combine term-document co-occurrence fragment by merging the vectors and thus creating
       // full vector (or in our case linear sequence) of documents the termid appears in
       // (termid -> Seq((documentId -> term-tfidf in document), (anotherDocumentId -> term-tfidf in that document), ...))
       .combineByKey[LinearSeq[(Int, Double)]](
         (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
         (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
         (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, Double)]) => { combiner ++ combinee // concatenating Seq's from different workers?},
         40 // this is just number of partitions depending on available hardware (cpus))

       /* Stage 2 : co-occurrence triangular matrix */
       // write upper half of the document co-occurrence matrix (triangular matrix) one matrix per termid.
       // we take vectors (sequences) generated in the previous step and traverse vector dimensions (containing tuple documentid -> tfidf)
       // building triangular matrix elements (rowid -> (colid -> row-col value)). Here the rowid and colid are documentids from the vector
       // of the term and row-col value is term-tfidf of both documents multiplied with each other (in our case cosine similarity).
       // result is sequence of tuples (leftDocumentId -> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), (anotherRightDocumentId -> left-tfidf * right-tfidf))
       // representing rows of upper triangular matrix, one per termid
       .flatMap { x: (Int, LinearSeq[(Int, Double)]) => triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }

       // tuples from the previous step are reduced by key (leftDocumentId, rightDocumentId) over all termid matrixes.
       // since most of the documents contain multiple terms, there will be rows for given leftdocumentid, rightdocumentid in multiple matrixes
       // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all common terms (it is ensured through reducing by key) summing the similarities up
       // this results into sequence representing triangular matrix aggregated by (leftDocumentId, rightDocumentId):
       // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
       .reduceByKey(_ + _, 40)

       /* Stage 3 : create similarities */
       // symetrify document co-occurrence matrix by generating lower triangular matrix from the upper triangular matrix through
       // swapping left document id and right document id (original (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
       .flatMap { x: ((Int, Int), Double) =>
           LinearSeq(x, ((x._1._2, x._1._1), x._2))
       }

       // save in hadoop
       .saveAsTextFile("row-similarity-test")
 }

 /**
  * Produces LinearSeq representing triangular matrix of document co-occurrences
  * @param accumulator is a temporary variable used by recursion (enables tail-recursion)
  * @param corpus shall contain at least two elements (2 document occurrences), otherwise an empty Seq will be returned
  * @return Seq of document co-occurrences formated as ((leftDocumentId -> rightDocumentId) -> leftTFIDF * rightTFIDF)
  */
 @tailrec private def triangularCoOccurrenceMatrix(accumulator: LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): LinearSeq[((Int, Int), Double)] = corpus match {
   case h +: Seq() => accumulator
   case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) yield ((h._1, e._1), e._2 * h._2)), t)
 }
}