You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "K. Shankari" <sh...@eecs.berkeley.edu> on 2014/01/01 10:35:52 UTC
Re: How to map each line to (line number, line)?
Why not use a zipped RDD?
http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
I have used something like this in the past.
> val index = sc.parallelize(Range.Long(0, rdd.count, 1),
rdd.partitions.size)
> val rddWithIndex = rdd.zip(index)
If that doesn't work, then you could try zipPartitions as well, since it
has slightly more relaxed constraints.
Thanks,
Shankari
On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <ct...@adatao.com> wrote:
> It's a reasonable ask (row indices) in some interactive use cases we've
> come across. We're working on providing support for this at a higher level
> of abstraction.
>
> Sent while mobile. Pls excuse typos etc.
> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <bu...@gmail.com>
> wrote:
>
>>
>>
>>
>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <an...@andrewash.com> wrote:
>>
>>> Hi Aureliano,
>>>
>>> It's very easy to get lines into (start byte number, line) using
>>> Hadoop's TextInputFormat. See how SparkContext's textFile() method does it
>>> here:
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>>
>>
>> Thanks for pointing this. while start byte number provides a globally
>> unique index for each line, my application needs the line number.
>>
>> It seems best to go with the source file containing the line numbers,
>> instead of recreating this is in hadoop/spark.
>>
>>
>>>
>>> What is the use case where you must have the global line number in the
>>> file, vs a global ordered unique identifier (my suggestion above) or a
>>> partition-local line number (discussed extensively below)?
>>>
>>> Also if you have any way to do this in plain Hadoop, Spark can use that
>>> as well.
>>>
>>> The fundamental difficulty is that knowing global line number breaks the
>>> assumption Hadoop makes everywhere that each record is independent of all
>>> the others. Maybe you should consider adding a line number to the
>>> beginning of every line on import time into HDFS instead of doing it
>>> afterwards in Spark.
>>>
>>> Cheers!
>>> Andrew
>>>
>>>
>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <
>>> buendia360@gmail.com> wrote:
>>>
>>>> I assumed that number of lines in each partition, except the last
>>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>>> makes sense.
>>>>
>>>> All of these methods are inefficient. Spark needs to support this
>>>> feature at lower level, as Michael suggested.
>>>>
>>>>
>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>>> guillaume.pitel@exensa.com> wrote:
>>>>
>>>>> You're assuming each partition has the same line count. I don't
>>>>> think it's true (actually, I'm almost certain it's false). And anyway your
>>>>> code also require two maps.
>>>>>
>>>>> In my code, the sorting as well as the other operations are performed
>>>>> on a very small dataset : one element per partition
>>>>>
>>>>> Guillaume
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>>> wrong direction, so change it with -i instead of i
>>>>>>
>>>>>
>>>>> I'm confused why would need in memory sorting. We just use a loop
>>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>>
>>>>> val count = lines.count() // lines is the rdd
>>>>> val partitionLinesCount = count / rdd.partitions.length
>>>>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>> var i = pi * partitionLinesCount
>>>>> it.map {
>>>>> *line => (i, line)*
>>>>> i += 1
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> [image: eXenSa]
>>>>> *Guillaume PITEL, Président*
>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>>
>>>>> eXenSa S.A.S. <http://www.exensa.com/>
>>>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>>
>>>>
>>>>
>>>
>>
Re: How to map each line to (line number, line)?
Posted by Andrew Ash <an...@andrewash.com>.
I'm not sure if you guys ever picked a preferred method for doing this, but
I just encountered it and came up with this method that's working
reasonably well on a small dataset. It should be quite easily
generalizable to non-String RDDs.
def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = {
val sc = r.sparkContext
val partitionSizes = r.mapPartitionsWithIndex( (index, rows) =>
Iterator( (index, rows.size) ) ).collect
val partitionGlobalStartIndex =
partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_)
val startIndexes = sc.broadcast(partitionGlobalStartIndex)
r.mapPartitionsWithIndex( (partitionIndex, rows) => {
val partitionStartIndex = startIndexes.value(partitionIndex)
rows.zipWithIndex map { case (row, rowIndex) =>
(partitionStartIndex + rowIndex, row)
}
})
}
On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel <gu...@exensa.com>
wrote:
> I'm not very comfortable with the idea of generating a rdd from the range
> (it might take a lot of memory), dispatching it to the nodes, then zipping.
>
> You should try and compare the two approaches and give us the performance
> comparison.
>
> Guillaume
>
> Why not use a zipped RDD?
>>
>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>>
>
> I do not know why no one else suggested this. Of course it has 3 extra
> loops (one for counting rdd, one for generating the range, one for
> zipping). Apart from this performance problem, any other caveats?
>
>
>>
>> I have used something like this in the past.
>>
>> > val index = sc.parallelize(Range.Long(0, rdd.count, 1),
>> rdd.partitions.size)
>> > val rddWithIndex = rdd.zip(index)
>>
>> If that doesn't work, then you could try zipPartitions as well, since
>> it has slightly more relaxed constraints.
>>
>>
> --
> [image: eXenSa]
> *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
> eXenSa S.A.S. <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>
Re: How to map each line to (line number, line)?
Posted by Guillaume Pitel <gu...@exensa.com>.
I'm not very comfortable with the idea of generating a rdd from the range (it
might take a lot of memory), dispatching it to the nodes, then zipping.
You should try and compare the two approaches and give us the performance
comparison.
Guillaume
>
> Why not use a zipped RDD?
> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>
>
> I do not know why no one else suggested this. Of course it has 3 extra loops
> (one for counting rdd, one for generating the range, one for zipping). Apart
> from this performance problem, any other caveats?
>
>
>
> I have used something like this in the past.
>
> > val index = sc.parallelize(Range.Long(0, rdd.count, 1), rdd.partitions.size)
> > val rddWithIndex = rdd.zip(index)
>
> If that doesn't work, then you could try zipPartitions as well, since it
> has slightly more relaxed constraints.
>
--
eXenSa
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Re: How to map each line to (line number, line)?
Posted by Aureliano Buendia <bu...@gmail.com>.
On Wed, Jan 1, 2014 at 9:35 AM, K. Shankari <sh...@eecs.berkeley.edu>wrote:
> Why not use a zipped RDD?
>
> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
>
I do not know why no one else suggested this. Of course it has 3 extra
loops (one for counting rdd, one for generating the range, one for
zipping). Apart from this performance problem, any other caveats?
>
> I have used something like this in the past.
>
> > val index = sc.parallelize(Range.Long(0, rdd.count, 1),
> rdd.partitions.size)
> > val rddWithIndex = rdd.zip(index)
>
> If that doesn't work, then you could try zipPartitions as well, since it
> has slightly more relaxed constraints.
>
> Thanks,
> Shankari
>
>
> On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <ct...@adatao.com>wrote:
>
>> It's a reasonable ask (row indices) in some interactive use cases we've
>> come across. We're working on providing support for this at a higher level
>> of abstraction.
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <bu...@gmail.com>
>> wrote:
>>
>>>
>>>
>>>
>>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <an...@andrewash.com>wrote:
>>>
>>>> Hi Aureliano,
>>>>
>>>> It's very easy to get lines into (start byte number, line) using
>>>> Hadoop's TextInputFormat. See how SparkContext's textFile() method does it
>>>> here:
>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>>>
>>>
>>> Thanks for pointing this. while start byte number provides a globally
>>> unique index for each line, my application needs the line number.
>>>
>>> It seems best to go with the source file containing the line numbers,
>>> instead of recreating this is in hadoop/spark.
>>>
>>>
>>>>
>>>> What is the use case where you must have the global line number in the
>>>> file, vs a global ordered unique identifier (my suggestion above) or a
>>>> partition-local line number (discussed extensively below)?
>>>>
>>>> Also if you have any way to do this in plain Hadoop, Spark can use that
>>>> as well.
>>>>
>>>> The fundamental difficulty is that knowing global line number breaks
>>>> the assumption Hadoop makes everywhere that each record is independent of
>>>> all the others. Maybe you should consider adding a line number to the
>>>> beginning of every line on import time into HDFS instead of doing it
>>>> afterwards in Spark.
>>>>
>>>> Cheers!
>>>> Andrew
>>>>
>>>>
>>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <
>>>> buendia360@gmail.com> wrote:
>>>>
>>>>> I assumed that number of lines in each partition, except the last
>>>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>>>> makes sense.
>>>>>
>>>>> All of these methods are inefficient. Spark needs to support this
>>>>> feature at lower level, as Michael suggested.
>>>>>
>>>>>
>>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>>>> guillaume.pitel@exensa.com> wrote:
>>>>>
>>>>>> You're assuming each partition has the same line count. I don't
>>>>>> think it's true (actually, I'm almost certain it's false). And anyway your
>>>>>> code also require two maps.
>>>>>>
>>>>>> In my code, the sorting as well as the other operations are performed
>>>>>> on a very small dataset : one element per partition
>>>>>>
>>>>>> Guillaume
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>>>> wrong direction, so change it with -i instead of i
>>>>>>>
>>>>>>
>>>>>> I'm confused why would need in memory sorting. We just use a loop
>>>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>>>
>>>>>> val count = lines.count() // lines is the rdd
>>>>>> val partitionLinesCount = count / rdd.partitions.length
>>>>>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>>> var i = pi * partitionLinesCount
>>>>>> it.map {
>>>>>> *line => (i, line)*
>>>>>> i += 1
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> [image: eXenSa]
>>>>>> *Guillaume PITEL, Président*
>>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>>>
>>>>>> eXenSa S.A.S. <http://www.exensa.com/>
>>>>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>