You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2014/07/22 01:40:07 UTC

Re: How to map each line to (line number, line)?

I'm not sure if you guys ever picked a preferred method for doing this, but
I just encountered it and came up with this method that's working
reasonably well on a small dataset.  It should be quite easily
generalizable to non-String RDDs.

def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = {
    val sc = r.sparkContext
    val partitionSizes = r.mapPartitionsWithIndex( (index, rows) =>
Iterator( (index, rows.size) ) ).collect
    val partitionGlobalStartIndex =
partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_)

    val startIndexes = sc.broadcast(partitionGlobalStartIndex)
    r.mapPartitionsWithIndex( (partitionIndex, rows) => {
        val partitionStartIndex = startIndexes.value(partitionIndex)
        rows.zipWithIndex map { case (row, rowIndex) =>
            (partitionStartIndex + rowIndex, row)
        }
    })
}


On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel <gu...@exensa.com>
wrote:

>  I'm not very comfortable with the idea of generating a rdd from the range
> (it might take a lot of memory), dispatching it to the nodes, then zipping.
>
> You should try and compare the two approaches and give us the performance
> comparison.
>
> Guillaume
>
>     Why not use a zipped RDD?
>>
>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>>
>
>  I do not know why no one else suggested this. Of course it has 3 extra
> loops (one for counting rdd, one for generating the range, one for
> zipping). Apart from this performance problem, any other caveats?
>
>
>>
>>  I have used something like this in the past.
>>
>>  > val index = sc.parallelize(Range.Long(0, rdd.count, 1),
>> rdd.partitions.size)
>>  > val rddWithIndex = rdd.zip(index)
>>
>>  If that doesn't work, then you could try zipPartitions as well, since
>> it has slightly more relaxed constraints.
>>
>>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>