You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "K. Shankari" <sh...@eecs.berkeley.edu> on 2014/01/01 10:35:52 UTC

Re: How to map each line to (line number, line)?

Why not use a zipped RDD?
http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD

I have used something like this in the past.

> val index = sc.parallelize(Range.Long(0, rdd.count, 1),
rdd.partitions.size)
> val rddWithIndex = rdd.zip(index)

If that doesn't work, then you could try zipPartitions as well, since it
has slightly more relaxed constraints.

Thanks,
Shankari


On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <ct...@adatao.com> wrote:

> It's a reasonable ask (row indices) in some interactive use cases we've
> come across. We're working on providing support for this at a higher level
> of abstraction.
>
> Sent while mobile. Pls excuse typos etc.
> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <bu...@gmail.com>
> wrote:
>
>>
>>
>>
>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <an...@andrewash.com> wrote:
>>
>>> Hi Aureliano,
>>>
>>> It's very easy to get lines into (start byte number, line) using
>>> Hadoop's TextInputFormat.  See how SparkContext's textFile() method does it
>>> here:
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>>
>>
>> Thanks for pointing this. while start byte number provides a globally
>> unique index for each line, my application needs the line number.
>>
>> It seems best to go with the source file containing the line numbers,
>> instead of recreating this is in hadoop/spark.
>>
>>
>>>
>>> What is the use case where you must have the global line number in the
>>> file, vs a global ordered unique identifier (my suggestion above) or a
>>> partition-local line number (discussed extensively below)?
>>>
>>> Also if you have any way to do this in plain Hadoop, Spark can use that
>>> as well.
>>>
>>> The fundamental difficulty is that knowing global line number breaks the
>>> assumption Hadoop makes everywhere that each record is independent of all
>>> the others.  Maybe you should consider adding a line number to the
>>> beginning of every line on import time into HDFS instead of doing it
>>> afterwards in Spark.
>>>
>>> Cheers!
>>> Andrew
>>>
>>>
>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <
>>> buendia360@gmail.com> wrote:
>>>
>>>> I assumed that number of lines in each partition, except the last
>>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>>> makes sense.
>>>>
>>>> All of these methods are inefficient. Spark needs to support this
>>>> feature at lower level, as Michael suggested.
>>>>
>>>>
>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>>> guillaume.pitel@exensa.com> wrote:
>>>>
>>>>>  You're assuming each partition has the same line count. I don't
>>>>> think it's true (actually, I'm almost certain it's false). And anyway your
>>>>> code also require two maps.
>>>>>
>>>>> In my code, the sorting as well as the other operations are performed
>>>>> on a very small dataset : one element per partition
>>>>>
>>>>> Guillaume
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>>> wrong direction, so change it with -i instead of i
>>>>>>
>>>>>
>>>>>  I'm confused why would need in memory sorting. We just use a loop
>>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>>
>>>>>     val count = lines.count() // lines is the rdd
>>>>>     val partitionLinesCount = count / rdd.partitions.length
>>>>>     linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>>       var i = pi * partitionLinesCount
>>>>>       it.map {
>>>>>         *line => (i, line)*
>>>>>          i += 1
>>>>>        }
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>    [image: eXenSa]
>>>>>  *Guillaume PITEL, Président*
>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>>
>>>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>>
>>>>
>>>>
>>>
>>

Re: How to map each line to (line number, line)?

Posted by Andrew Ash <an...@andrewash.com>.
I'm not sure if you guys ever picked a preferred method for doing this, but
I just encountered it and came up with this method that's working
reasonably well on a small dataset.  It should be quite easily
generalizable to non-String RDDs.

def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = {
    val sc = r.sparkContext
    val partitionSizes = r.mapPartitionsWithIndex( (index, rows) =>
Iterator( (index, rows.size) ) ).collect
    val partitionGlobalStartIndex =
partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_)

    val startIndexes = sc.broadcast(partitionGlobalStartIndex)
    r.mapPartitionsWithIndex( (partitionIndex, rows) => {
        val partitionStartIndex = startIndexes.value(partitionIndex)
        rows.zipWithIndex map { case (row, rowIndex) =>
            (partitionStartIndex + rowIndex, row)
        }
    })
}


On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel <gu...@exensa.com>
wrote:

>  I'm not very comfortable with the idea of generating a rdd from the range
> (it might take a lot of memory), dispatching it to the nodes, then zipping.
>
> You should try and compare the two approaches and give us the performance
> comparison.
>
> Guillaume
>
>     Why not use a zipped RDD?
>>
>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>>
>
>  I do not know why no one else suggested this. Of course it has 3 extra
> loops (one for counting rdd, one for generating the range, one for
> zipping). Apart from this performance problem, any other caveats?
>
>
>>
>>  I have used something like this in the past.
>>
>>  > val index = sc.parallelize(Range.Long(0, rdd.count, 1),
>> rdd.partitions.size)
>>  > val rddWithIndex = rdd.zip(index)
>>
>>  If that doesn't work, then you could try zipPartitions as well, since
>> it has slightly more relaxed constraints.
>>
>>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: How to map each line to (line number, line)?

Posted by Guillaume Pitel <gu...@exensa.com>.
I'm not very comfortable with the idea of generating a rdd from the range (it
might take a lot of memory), dispatching it to the nodes, then zipping.

You should try and compare the two approaches and give us the performance
comparison.

Guillaume
>
>     Why not use a zipped RDD?
>     http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>
>
> I do not know why no one else suggested this. Of course it has 3 extra loops
> (one for counting rdd, one for generating the range, one for zipping). Apart
> from this performance problem, any other caveats?
>  
>
>
>     I have used something like this in the past.
>
>     > val index = sc.parallelize(Range.Long(0, rdd.count, 1), rdd.partitions.size)
>     > val rddWithIndex = rdd.zip(index)
>
>     If that doesn't work, then you could try zipPartitions as well, since it
>     has slightly more relaxed constraints.
>

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: How to map each line to (line number, line)?

Posted by Aureliano Buendia <bu...@gmail.com>.
On Wed, Jan 1, 2014 at 9:35 AM, K. Shankari <sh...@eecs.berkeley.edu>wrote:

> Why not use a zipped RDD?
>
> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>

I do not know why no one else suggested this. Of course it has 3 extra
loops (one for counting rdd, one for generating the range, one for
zipping). Apart from this performance problem, any other caveats?


>
> I have used something like this in the past.
>
> > val index = sc.parallelize(Range.Long(0, rdd.count, 1),
> rdd.partitions.size)
> > val rddWithIndex = rdd.zip(index)
>
>  If that doesn't work, then you could try zipPartitions as well, since it
> has slightly more relaxed constraints.
>
> Thanks,
> Shankari
>
>
> On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <ct...@adatao.com>wrote:
>
>> It's a reasonable ask (row indices) in some interactive use cases we've
>> come across. We're working on providing support for this at a higher level
>> of abstraction.
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <bu...@gmail.com>
>> wrote:
>>
>>>
>>>
>>>
>>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <an...@andrewash.com>wrote:
>>>
>>>> Hi Aureliano,
>>>>
>>>> It's very easy to get lines into (start byte number, line) using
>>>> Hadoop's TextInputFormat.  See how SparkContext's textFile() method does it
>>>> here:
>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>>>
>>>
>>> Thanks for pointing this. while start byte number provides a globally
>>> unique index for each line, my application needs the line number.
>>>
>>> It seems best to go with the source file containing the line numbers,
>>> instead of recreating this is in hadoop/spark.
>>>
>>>
>>>>
>>>> What is the use case where you must have the global line number in the
>>>> file, vs a global ordered unique identifier (my suggestion above) or a
>>>> partition-local line number (discussed extensively below)?
>>>>
>>>> Also if you have any way to do this in plain Hadoop, Spark can use that
>>>> as well.
>>>>
>>>> The fundamental difficulty is that knowing global line number breaks
>>>> the assumption Hadoop makes everywhere that each record is independent of
>>>> all the others.  Maybe you should consider adding a line number to the
>>>> beginning of every line on import time into HDFS instead of doing it
>>>> afterwards in Spark.
>>>>
>>>> Cheers!
>>>> Andrew
>>>>
>>>>
>>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <
>>>> buendia360@gmail.com> wrote:
>>>>
>>>>> I assumed that number of lines in each partition, except the last
>>>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>>>> makes sense.
>>>>>
>>>>> All of these methods are inefficient. Spark needs to support this
>>>>> feature at lower level, as Michael suggested.
>>>>>
>>>>>
>>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>>>> guillaume.pitel@exensa.com> wrote:
>>>>>
>>>>>>  You're assuming each partition has the same line count. I don't
>>>>>> think it's true (actually, I'm almost certain it's false). And anyway your
>>>>>> code also require two maps.
>>>>>>
>>>>>> In my code, the sorting as well as the other operations are performed
>>>>>> on a very small dataset : one element per partition
>>>>>>
>>>>>> Guillaume
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>>>> wrong direction, so change it with -i instead of i
>>>>>>>
>>>>>>
>>>>>>  I'm confused why would need in memory sorting. We just use a loop
>>>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>>>
>>>>>>     val count = lines.count() // lines is the rdd
>>>>>>     val partitionLinesCount = count / rdd.partitions.length
>>>>>>     linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>>>       var i = pi * partitionLinesCount
>>>>>>       it.map {
>>>>>>         *line => (i, line)*
>>>>>>          i += 1
>>>>>>        }
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>    [image: eXenSa]
>>>>>>  *Guillaume PITEL, Président*
>>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>>>
>>>>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>