You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Philip Ogren <ph...@oracle.com> on 2013/11/08 22:15:16 UTC

code review - counting populated columns

Hi Spark coders,

I wrote my first little Spark job that takes columnar data and counts up 
how many times each column is populated in an RDD.  Here is the code I 
came up with:

     //RDD of List[String] corresponding to tab delimited values
     val columns = spark.textFile("myfile.tsv").map(line => 
line.split("\t").toList)
     //RDD of List[Int] corresponding to populated columns (1 for 
populated and 0 for not populated)
     val populatedColumns = columns.map(row => row.map(column => 
if(column.length > 0) 1 else 0))
     //List[Int] contains sums of the 1's in each column
     val counts = populatedColumns.reduce((row1,row2) 
=>(row1,row2).zipped.map(_+_))

Any thoughts about the fitness of this code snippet?  I'm a little 
annoyed by creating an RDD full of 1's and 0's in the second line.  The 
if statement feels awkward too.  I was happy to find the zipped method 
for the reduce step.  Any feedback you might have on how to improve this 
code is appreciated.  I'm a newbie to both Scala and Spark.

Thanks,
Philip


Re: code review - counting populated columns

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Philip,

Your code is exactly what I was suggesting. I didn't explain it
clearly, when I said "emit XX", I just meant "figure out how to do XX
and return the result" there isn't actually a function called 'emit'.

In your case, the correct way to do it was using zipWithIndex... I
just couldn't remember off the top of my head how to do that.

- Patrick

On Sat, Nov 9, 2013 at 4:41 AM, Tom Vacek <mi...@gmail.com> wrote:
> Patrick, you got me thinking, but I'm sticking to my opinion that
> reduceByKey should be avoided if possible.  I tried some timings:
>
> def time[T](code : => T) =  {
>         val t0 = System.nanoTime : Double
>         val res = code
>         val t1 = System.nanoTime : Double
>         println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs")
>         res
> }
> val sparsity=.001
> val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it) =>
> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
> 10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
> ).map(_.toArray).cache
> val rowsFlat = rows.flatMap(rr => rr).cache
>
> rows.count
> rowsFlat.count
>
> val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
> //Elapsed time 725.394917 msecs
>
> val cSums2 = time( rows.mapPartitions(it =>
> Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) =>
> {nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
> r1.zip(r2).map(tt => tt._1 + tt._2)))
> //Elapsed time 206.962364 msecs
>
> These are the best times over a small number of runs, but average case
> showed the same behavior.
> The merge reduction I had suggested was not even close, which doesn't
> surprise me much on second thought.
>
> At sparsity=.01, the times are 2447 v. 394.
>
> Lesson 1: You would care about this in an iterative algorithm, but not in a
> one-off application.
> Lesson 2: Shuffle is slow in comparison, even for a small number of
> elements.
> Lesson 3: Spark would be even cooler with highly optimized reduce and
> broadcast.
>
>
>
> On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <ph...@oracle.com>
> wrote:
>>
>> Thank you for the pointers.  I'm not sure I was able to fully understand
>> either of your suggestions but here is what I came up with.  I started with
>> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
>> Any thoughts about my updated solution are more than welcome!  I added local
>> variable types for clarify.
>>
>>   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
>>     //split by tab and zip with index to give column value, column index
>> pairs
>>     val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
>> line.split("\t").zipWithIndex)
>>     //filter out all the zero length values
>>     val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
>> valueIndex._1.length>0)
>>     //map each column index to one and do the usual reduction
>>     dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
>>   }
>>
>> Of course, this can be condensed to a single line but it doesn't seem as
>> easy to read as the more verbose code above.  Write-once code like the
>> following is why I never liked Perl....
>>
>>   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
>>     tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
>> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
>>   }
>>
>> Thanks,
>> Philip
>>
>>
>>
>> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>>>
>>> Hey Tom,
>>>
>>> reduceByKey will reduce locally on all the nodes, so there won't be
>>> any data movement except to combine totals at the end.
>>>
>>> - Patrick
>>>
>>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <mi...@gmail.com> wrote:
>>>>
>>>> Your example requires each row to be exactly the same length, since
>>>> zipped
>>>> will truncate to the shorter of its two arguments.
>>>>
>>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>>> of
>>>> data around to sort the keys.  I suspect it would be a lot slower.  But
>>>> you
>>>> could save yourself from adding up a bunch of zeros:
>>>>
>>>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>>> sparseRows.reduce(mergeAdd(_,_))
>>>>
>>>> You'll have to write a mergeAdd function.  This might not be any faster,
>>>> but
>>>> it does allow variable length rows.
>>>>
>>>>
>>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com>
>>>> wrote:
>>>>>
>>>>> It would be a bit more straightforward to write it like this:
>>>>>
>>>>> val columns = [same as before]
>>>>>
>>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>>> column).reduceByKey(_+ _)
>>>>>
>>>>> Basically look at each row and emit several records using flatMap.
>>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>>> whether it's present.
>>>>>
>>>>> Then you reduce by key to get the per-column count. Then you can
>>>>> collect at the end.
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
>>>>> wrote:
>>>>>>
>>>>>> Hi Spark coders,
>>>>>>
>>>>>> I wrote my first little Spark job that takes columnar data and counts
>>>>>> up
>>>>>> how
>>>>>> many times each column is populated in an RDD.  Here is the code I
>>>>>> came
>>>>>> up
>>>>>> with:
>>>>>>
>>>>>>      //RDD of List[String] corresponding to tab delimited values
>>>>>>      val columns = spark.textFile("myfile.tsv").map(line =>
>>>>>> line.split("\t").toList)
>>>>>>      //RDD of List[Int] corresponding to populated columns (1 for
>>>>>> populated
>>>>>> and 0 for not populated)
>>>>>>      val populatedColumns = columns.map(row => row.map(column =>
>>>>>> if(column.length > 0) 1 else 0))
>>>>>>      //List[Int] contains sums of the 1's in each column
>>>>>>      val counts = populatedColumns.reduce((row1,row2)
>>>>>> =>(row1,row2).zipped.map(_+_))
>>>>>>
>>>>>> Any thoughts about the fitness of this code snippet?  I'm a little
>>>>>> annoyed
>>>>>> by creating an RDD full of 1's and 0's in the second line.  The if
>>>>>> statement
>>>>>> feels awkward too.  I was happy to find the zipped method for the
>>>>>> reduce
>>>>>> step.  Any feedback you might have on how to improve this code is
>>>>>> appreciated.  I'm a newbie to both Scala and Spark.
>>>>>>
>>>>>> Thanks,
>>>>>> Philip
>>>>>>
>>>>
>>
>

Re: code review - counting populated columns

Posted by Tom Vacek <mi...@gmail.com>.
Patrick, you got me thinking, but I'm sticking to my opinion that
reduceByKey should be avoided if possible.  I tried some timings:

def time[T](code : => T) =  {
        val t0 = System.nanoTime : Double
        val res = code
        val t1 = System.nanoTime : Double
        println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs")
        res
}
val sparsity=.001
val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it)
=> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
).map(_.toArray).cache
val rowsFlat = rows.flatMap(rr => rr).cache

rows.count
rowsFlat.count

val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
//Elapsed time 725.394917 msecs

val cSums2 = time( rows.mapPartitions(it =>
Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) =>
{nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
r1.zip(r2).map(tt => tt._1 + tt._2)))
//Elapsed time 206.962364 msecs

These are the best times over a small number of runs, but average case
showed the same behavior.
The merge reduction I had suggested was not even close, which doesn't
surprise me much on second thought.

At sparsity=.01, the times are 2447 v. 394.

Lesson 1: You would care about this in an iterative algorithm, but not in a
one-off application.
Lesson 2: Shuffle is slow in comparison, even for a small number of
elements.
Lesson 3: Spark would be even cooler with highly optimized reduce and
broadcast.



On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <ph...@oracle.com>wrote:

> Thank you for the pointers.  I'm not sure I was able to fully understand
> either of your suggestions but here is what I came up with.  I started with
> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
>  Any thoughts about my updated solution are more than welcome!  I added
> local variable types for clarify.
>
>   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
>     //split by tab and zip with index to give column value, column index
> pairs
>     val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
> line.split("\t").zipWithIndex)
>     //filter out all the zero length values
>     val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
> valueIndex._1.length>0)
>     //map each column index to one and do the usual reduction
>     dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
>   }
>
> Of course, this can be condensed to a single line but it doesn't seem as
> easy to read as the more verbose code above.  Write-once code like the
> following is why I never liked Perl....
>
>   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
>     tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
>   }
>
> Thanks,
> Philip
>
>
>
> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>
>> Hey Tom,
>>
>> reduceByKey will reduce locally on all the nodes, so there won't be
>> any data movement except to combine totals at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <mi...@gmail.com> wrote:
>>
>>> Your example requires each row to be exactly the same length, since
>>> zipped
>>> will truncate to the shorter of its two arguments.
>>>
>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>> of
>>> data around to sort the keys.  I suspect it would be a lot slower.  But
>>> you
>>> could save yourself from adding up a bunch of zeros:
>>>
>>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>> sparseRows.reduce(mergeAdd(_,_))
>>>
>>> You'll have to write a mergeAdd function.  This might not be any faster,
>>> but
>>> it does allow variable length rows.
>>>
>>>
>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com>
>>> wrote:
>>>
>>>> It would be a bit more straightforward to write it like this:
>>>>
>>>> val columns = [same as before]
>>>>
>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>> column).reduceByKey(_+ _)
>>>>
>>>> Basically look at each row and emit several records using flatMap.
>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>> whether it's present.
>>>>
>>>> Then you reduce by key to get the per-column count. Then you can
>>>> collect at the end.
>>>>
>>>> - Patrick
>>>>
>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
>>>> wrote:
>>>>
>>>>> Hi Spark coders,
>>>>>
>>>>> I wrote my first little Spark job that takes columnar data and counts
>>>>> up
>>>>> how
>>>>> many times each column is populated in an RDD.  Here is the code I came
>>>>> up
>>>>> with:
>>>>>
>>>>>      //RDD of List[String] corresponding to tab delimited values
>>>>>      val columns = spark.textFile("myfile.tsv").map(line =>
>>>>> line.split("\t").toList)
>>>>>      //RDD of List[Int] corresponding to populated columns (1 for
>>>>> populated
>>>>> and 0 for not populated)
>>>>>      val populatedColumns = columns.map(row => row.map(column =>
>>>>> if(column.length > 0) 1 else 0))
>>>>>      //List[Int] contains sums of the 1's in each column
>>>>>      val counts = populatedColumns.reduce((row1,row2)
>>>>> =>(row1,row2).zipped.map(_+_))
>>>>>
>>>>> Any thoughts about the fitness of this code snippet?  I'm a little
>>>>> annoyed
>>>>> by creating an RDD full of 1's and 0's in the second line.  The if
>>>>> statement
>>>>> feels awkward too.  I was happy to find the zipped method for the
>>>>> reduce
>>>>> step.  Any feedback you might have on how to improve this code is
>>>>> appreciated.  I'm a newbie to both Scala and Spark.
>>>>>
>>>>> Thanks,
>>>>> Philip
>>>>>
>>>>>
>>>
>

Re: code review - counting populated columns

Posted by Philip Ogren <ph...@oracle.com>.
Thank you for the pointers.  I'm not sure I was able to fully understand 
either of your suggestions but here is what I came up with.  I started 
with Tom's code but I think I ended up borrowing from Patrick's 
suggestion too.  Any thoughts about my updated solution are more than 
welcome!  I added local variable types for clarify.

   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
     //split by tab and zip with index to give column value, column 
index pairs
     val sparse : RDD[(String, Int)] = tsv.flatMap(line => 
line.split("\t").zipWithIndex)
     //filter out all the zero length values
     val dense : RDD[(String, Int)] = sparse.filter(valueIndex => 
valueIndex._1.length>0)
     //map each column index to one and do the usual reduction
     dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
   }

Of course, this can be condensed to a single line but it doesn't seem as 
easy to read as the more verbose code above.  Write-once code like the 
following is why I never liked Perl....

   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
     tsv.flatMap(_.split("\t").zipWithIndex).filter(ci => 
ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
   }

Thanks,
Philip


On 11/8/2013 2:41 PM, Patrick Wendell wrote:
> Hey Tom,
>
> reduceByKey will reduce locally on all the nodes, so there won't be
> any data movement except to combine totals at the end.
>
> - Patrick
>
> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <mi...@gmail.com> wrote:
>> Your example requires each row to be exactly the same length, since zipped
>> will truncate to the shorter of its two arguments.
>>
>> The second solution is elegant, but reduceByKey involves flying a bunch of
>> data around to sort the keys.  I suspect it would be a lot slower.  But you
>> could save yourself from adding up a bunch of zeros:
>>
>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>> sparseRows.reduce(mergeAdd(_,_))
>>
>> You'll have to write a mergeAdd function.  This might not be any faster, but
>> it does allow variable length rows.
>>
>>
>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>> It would be a bit more straightforward to write it like this:
>>>
>>> val columns = [same as before]
>>>
>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>> column).reduceByKey(_+ _)
>>>
>>> Basically look at each row and emit several records using flatMap.
>>> Each record has an ID for the column (maybe its index) and a flag for
>>> whether it's present.
>>>
>>> Then you reduce by key to get the per-column count. Then you can
>>> collect at the end.
>>>
>>> - Patrick
>>>
>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
>>> wrote:
>>>> Hi Spark coders,
>>>>
>>>> I wrote my first little Spark job that takes columnar data and counts up
>>>> how
>>>> many times each column is populated in an RDD.  Here is the code I came
>>>> up
>>>> with:
>>>>
>>>>      //RDD of List[String] corresponding to tab delimited values
>>>>      val columns = spark.textFile("myfile.tsv").map(line =>
>>>> line.split("\t").toList)
>>>>      //RDD of List[Int] corresponding to populated columns (1 for
>>>> populated
>>>> and 0 for not populated)
>>>>      val populatedColumns = columns.map(row => row.map(column =>
>>>> if(column.length > 0) 1 else 0))
>>>>      //List[Int] contains sums of the 1's in each column
>>>>      val counts = populatedColumns.reduce((row1,row2)
>>>> =>(row1,row2).zipped.map(_+_))
>>>>
>>>> Any thoughts about the fitness of this code snippet?  I'm a little
>>>> annoyed
>>>> by creating an RDD full of 1's and 0's in the second line.  The if
>>>> statement
>>>> feels awkward too.  I was happy to find the zipped method for the reduce
>>>> step.  Any feedback you might have on how to improve this code is
>>>> appreciated.  I'm a newbie to both Scala and Spark.
>>>>
>>>> Thanks,
>>>> Philip
>>>>
>>


Re: code review - counting populated columns

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Tom,

reduceByKey will reduce locally on all the nodes, so there won't be
any data movement except to combine totals at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <mi...@gmail.com> wrote:
> Your example requires each row to be exactly the same length, since zipped
> will truncate to the shorter of its two arguments.
>
> The second solution is elegant, but reduceByKey involves flying a bunch of
> data around to sort the keys.  I suspect it would be a lot slower.  But you
> could save yourself from adding up a bunch of zeros:
>
>  val sparseRows = spark.textFile("myfile.tsv").map(line =>
> line.split("\t").zipWithIndex.filter(_._1.length>0))
> sparseRows.reduce(mergeAdd(_,_))
>
> You'll have to write a mergeAdd function.  This might not be any faster, but
> it does allow variable length rows.
>
>
> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>
>> It would be a bit more straightforward to write it like this:
>>
>> val columns = [same as before]
>>
>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>> column).reduceByKey(_+ _)
>>
>> Basically look at each row and emit several records using flatMap.
>> Each record has an ID for the column (maybe its index) and a flag for
>> whether it's present.
>>
>> Then you reduce by key to get the per-column count. Then you can
>> collect at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
>> wrote:
>> > Hi Spark coders,
>> >
>> > I wrote my first little Spark job that takes columnar data and counts up
>> > how
>> > many times each column is populated in an RDD.  Here is the code I came
>> > up
>> > with:
>> >
>> >     //RDD of List[String] corresponding to tab delimited values
>> >     val columns = spark.textFile("myfile.tsv").map(line =>
>> > line.split("\t").toList)
>> >     //RDD of List[Int] corresponding to populated columns (1 for
>> > populated
>> > and 0 for not populated)
>> >     val populatedColumns = columns.map(row => row.map(column =>
>> > if(column.length > 0) 1 else 0))
>> >     //List[Int] contains sums of the 1's in each column
>> >     val counts = populatedColumns.reduce((row1,row2)
>> > =>(row1,row2).zipped.map(_+_))
>> >
>> > Any thoughts about the fitness of this code snippet?  I'm a little
>> > annoyed
>> > by creating an RDD full of 1's and 0's in the second line.  The if
>> > statement
>> > feels awkward too.  I was happy to find the zipped method for the reduce
>> > step.  Any feedback you might have on how to improve this code is
>> > appreciated.  I'm a newbie to both Scala and Spark.
>> >
>> > Thanks,
>> > Philip
>> >
>
>

Re: code review - counting populated columns

Posted by Tom Vacek <mi...@gmail.com>.
Messed up.  Should be
 val sparseRows = spark.textFile("myfile.tsv").map(line =>
line.split("\t").zipWithIndex.flatMap( tt => if(tt._1.length>0) (tt._2, 1) )
Then reduce with a mergeAdd.


On Fri, Nov 8, 2013 at 3:35 PM, Tom Vacek <mi...@gmail.com> wrote:

> Your example requires each row to be exactly the same length, since zipped
> will truncate to the shorter of its two arguments.
>
> The second solution is elegant, but reduceByKey involves flying a bunch of
> data around to sort the keys.  I suspect it would be a lot slower.  But you
> could save yourself from adding up a bunch of zeros:
>
>  val sparseRows = spark.textFile("myfile.tsv").map(line =>
> line.split("\t").zipWithIndex.filter(_._1.length>0))
> sparseRows.reduce(mergeAdd(_,_))
>
> You'll have to write a mergeAdd function.  This might not be any faster,
> but it does allow variable length rows.
>
>
> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com>wrote:
>
>> It would be a bit more straightforward to write it like this:
>>
>> val columns = [same as before]
>>
>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>> column).reduceByKey(_+ _)
>>
>> Basically look at each row and emit several records using flatMap.
>> Each record has an ID for the column (maybe its index) and a flag for
>> whether it's present.
>>
>> Then you reduce by key to get the per-column count. Then you can
>> collect at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
>> wrote:
>> > Hi Spark coders,
>> >
>> > I wrote my first little Spark job that takes columnar data and counts
>> up how
>> > many times each column is populated in an RDD.  Here is the code I came
>> up
>> > with:
>> >
>> >     //RDD of List[String] corresponding to tab delimited values
>> >     val columns = spark.textFile("myfile.tsv").map(line =>
>> > line.split("\t").toList)
>> >     //RDD of List[Int] corresponding to populated columns (1 for
>> populated
>> > and 0 for not populated)
>> >     val populatedColumns = columns.map(row => row.map(column =>
>> > if(column.length > 0) 1 else 0))
>> >     //List[Int] contains sums of the 1's in each column
>> >     val counts = populatedColumns.reduce((row1,row2)
>> > =>(row1,row2).zipped.map(_+_))
>> >
>> > Any thoughts about the fitness of this code snippet?  I'm a little
>> annoyed
>> > by creating an RDD full of 1's and 0's in the second line.  The if
>> statement
>> > feels awkward too.  I was happy to find the zipped method for the reduce
>> > step.  Any feedback you might have on how to improve this code is
>> > appreciated.  I'm a newbie to both Scala and Spark.
>> >
>> > Thanks,
>> > Philip
>> >
>>
>
>

Re: code review - counting populated columns

Posted by Tom Vacek <mi...@gmail.com>.
Your example requires each row to be exactly the same length, since zipped
will truncate to the shorter of its two arguments.

The second solution is elegant, but reduceByKey involves flying a bunch of
data around to sort the keys.  I suspect it would be a lot slower.  But you
could save yourself from adding up a bunch of zeros:

 val sparseRows = spark.textFile("myfile.tsv").map(line =>
line.split("\t").zipWithIndex.filter(_._1.length>0))
sparseRows.reduce(mergeAdd(_,_))

You'll have to write a mergeAdd function.  This might not be any faster,
but it does allow variable length rows.


On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pw...@gmail.com> wrote:

> It would be a bit more straightforward to write it like this:
>
> val columns = [same as before]
>
> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
> column).reduceByKey(_+ _)
>
> Basically look at each row and emit several records using flatMap.
> Each record has an ID for the column (maybe its index) and a flag for
> whether it's present.
>
> Then you reduce by key to get the per-column count. Then you can
> collect at the end.
>
> - Patrick
>
> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com>
> wrote:
> > Hi Spark coders,
> >
> > I wrote my first little Spark job that takes columnar data and counts up
> how
> > many times each column is populated in an RDD.  Here is the code I came
> up
> > with:
> >
> >     //RDD of List[String] corresponding to tab delimited values
> >     val columns = spark.textFile("myfile.tsv").map(line =>
> > line.split("\t").toList)
> >     //RDD of List[Int] corresponding to populated columns (1 for
> populated
> > and 0 for not populated)
> >     val populatedColumns = columns.map(row => row.map(column =>
> > if(column.length > 0) 1 else 0))
> >     //List[Int] contains sums of the 1's in each column
> >     val counts = populatedColumns.reduce((row1,row2)
> > =>(row1,row2).zipped.map(_+_))
> >
> > Any thoughts about the fitness of this code snippet?  I'm a little
> annoyed
> > by creating an RDD full of 1's and 0's in the second line.  The if
> statement
> > feels awkward too.  I was happy to find the zipped method for the reduce
> > step.  Any feedback you might have on how to improve this code is
> > appreciated.  I'm a newbie to both Scala and Spark.
> >
> > Thanks,
> > Philip
> >
>

Re: code review - counting populated columns

Posted by Philip Ogren <ph...@oracle.com>.
Where does 'emit' come from?  I don't see it in the Scala or Spark 
apidocs (though I don't feel very deft at searching either!)

Thanks,
Philip

On 11/8/2013 2:23 PM, Patrick Wendell wrote:
> It would be a bit more straightforward to write it like this:
>
> val columns = [same as before]
>
> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
> column).reduceByKey(_+ _)
>
> Basically look at each row and emit several records using flatMap.
> Each record has an ID for the column (maybe its index) and a flag for
> whether it's present.
>
> Then you reduce by key to get the per-column count. Then you can
> collect at the end.
>
> - Patrick
>
> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com> wrote:
>> Hi Spark coders,
>>
>> I wrote my first little Spark job that takes columnar data and counts up how
>> many times each column is populated in an RDD.  Here is the code I came up
>> with:
>>
>>      //RDD of List[String] corresponding to tab delimited values
>>      val columns = spark.textFile("myfile.tsv").map(line =>
>> line.split("\t").toList)
>>      //RDD of List[Int] corresponding to populated columns (1 for populated
>> and 0 for not populated)
>>      val populatedColumns = columns.map(row => row.map(column =>
>> if(column.length > 0) 1 else 0))
>>      //List[Int] contains sums of the 1's in each column
>>      val counts = populatedColumns.reduce((row1,row2)
>> =>(row1,row2).zipped.map(_+_))
>>
>> Any thoughts about the fitness of this code snippet?  I'm a little annoyed
>> by creating an RDD full of 1's and 0's in the second line.  The if statement
>> feels awkward too.  I was happy to find the zipped method for the reduce
>> step.  Any feedback you might have on how to improve this code is
>> appreciated.  I'm a newbie to both Scala and Spark.
>>
>> Thanks,
>> Philip
>>


Re: code review - counting populated columns

Posted by Patrick Wendell <pw...@gmail.com>.
It would be a bit more straightforward to write it like this:

val columns = [same as before]

val counts = columns.flatMap(emit (col_id, 0 or 1) for each
column).reduceByKey(_+ _)

Basically look at each row and emit several records using flatMap.
Each record has an ID for the column (maybe its index) and a flag for
whether it's present.

Then you reduce by key to get the per-column count. Then you can
collect at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <ph...@oracle.com> wrote:
> Hi Spark coders,
>
> I wrote my first little Spark job that takes columnar data and counts up how
> many times each column is populated in an RDD.  Here is the code I came up
> with:
>
>     //RDD of List[String] corresponding to tab delimited values
>     val columns = spark.textFile("myfile.tsv").map(line =>
> line.split("\t").toList)
>     //RDD of List[Int] corresponding to populated columns (1 for populated
> and 0 for not populated)
>     val populatedColumns = columns.map(row => row.map(column =>
> if(column.length > 0) 1 else 0))
>     //List[Int] contains sums of the 1's in each column
>     val counts = populatedColumns.reduce((row1,row2)
> =>(row1,row2).zipped.map(_+_))
>
> Any thoughts about the fitness of this code snippet?  I'm a little annoyed
> by creating an RDD full of 1's and 0's in the second line.  The if statement
> feels awkward too.  I was happy to find the zipped method for the reduce
> step.  Any feedback you might have on how to improve this code is
> appreciated.  I'm a newbie to both Scala and Spark.
>
> Thanks,
> Philip
>