You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Niranda Perera <ni...@gmail.com> on 2015/09/02 06:35:13 UTC

taking an n number of rows from and RDD starting from an index

Hi all,

I have a large set of data which would not fit into the memory. So, I wan
to take n number of data from the RDD given a particular index. for an
example, take 1000 rows starting from the index 1001.

I see that there is a  take(num: Int): Array[T] method in the RDD, but it
only returns the 'first n number of rows'.

the simplest use case of this, requirement is, say, I write a custom
relation provider with a custom relation extending the InsertableRelation.

say I submit this query,
"insert into table abc select * from xyz sort by x asc"

in my custom relation, I have implemented the def insert(data: DataFrame,
overwrite: Boolean): Unit
method. here, since the data is large, I can not call methods such as
DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
As you could see, the resultant DF from the "select * from xyz sort by x
asc" is sorted, and if I sun, foreachpartition on that DF and implement the
insert method, this sorted order would be affected, since the inserting
operation would be done in parallel in each partition.

in order to handle this, my initial idea was to take rows from the RDD in
batches and do the insert operation, and for that I was looking for a
method to take n number of rows starting from a given index.

is there any better way to handle this, in RDDs?

your assistance in this regard is highly appreciated.

cheers

-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
https://pythagoreanscript.wordpress.com/

Re: taking an n number of rows from and RDD starting from an index

Posted by Niranda Perera <ni...@gmail.com>.
Hi all,

thank you for your response.

after taking a look at the implementations of rdd.collect(), I thought of
using the rdd.runJob(...) method .

for (int i = 0; i < dataFrame.rdd().partitions().length; i++) {
                dataFrame.sqlContext().sparkContext().runJob(data.rdd(),
some function, { i } , false, ClassTag$.MODULE$.Unit());
            }

this iterates through the partitions of the dataframe.

I would like to know if this is an accepted way of iterating through
dataFrame partitions while conserving the order of rows encapsulated by the
dataframe?

cheers


On Wed, Sep 2, 2015 at 12:33 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> Maybe you could use zipWithIndex and filter to skip the first elements.
> For example starting from
>
> scala> sc.parallelize(100 to 120, 4).zipWithIndex.collect
> res12: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3),
> (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11),
> (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18),
> (119,19), (120,20))
>
> we can get the 3 first elements starting from the 4th (counting from 0) as
>
> scala> sc.parallelize(100 to 120, 4).zipWithIndex.filter(_._2 >=4).take(3)
> res14: Array[(Int, Long)] = Array((104,4), (105,5), (106,6))
>
> Hope that helps
>
>
> 2015-09-02 8:52 GMT+02:00 Hemant Bhanawat <he...@gmail.com>:
>
>> I think rdd.toLocalIterator is what you want. But it will keep one
>> partition's data in-memory.
>>
>> On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera <niranda.perera@gmail.com
>> > wrote:
>>
>>> Hi all,
>>>
>>> I have a large set of data which would not fit into the memory. So, I
>>> wan to take n number of data from the RDD given a particular index. for an
>>> example, take 1000 rows starting from the index 1001.
>>>
>>> I see that there is a  take(num: Int): Array[T] method in the RDD, but
>>> it only returns the 'first n number of rows'.
>>>
>>> the simplest use case of this, requirement is, say, I write a custom
>>> relation provider with a custom relation extending the InsertableRelation.
>>>
>>> say I submit this query,
>>> "insert into table abc select * from xyz sort by x asc"
>>>
>>> in my custom relation, I have implemented the def insert(data:
>>> DataFrame, overwrite: Boolean): Unit
>>> method. here, since the data is large, I can not call methods such as
>>> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
>>> As you could see, the resultant DF from the "select * from xyz sort by x
>>> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
>>> insert method, this sorted order would be affected, since the inserting
>>> operation would be done in parallel in each partition.
>>>
>>> in order to handle this, my initial idea was to take rows from the RDD
>>> in batches and do the insert operation, and for that I was looking for a
>>> method to take n number of rows starting from a given index.
>>>
>>> is there any better way to handle this, in RDDs?
>>>
>>> your assistance in this regard is highly appreciated.
>>>
>>> cheers
>>>
>>> --
>>> Niranda
>>> @n1r44 <https://twitter.com/N1R44>
>>> https://pythagoreanscript.wordpress.com/
>>>
>>
>>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
https://pythagoreanscript.wordpress.com/

Re: taking an n number of rows from and RDD starting from an index

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi,

Maybe you could use zipWithIndex and filter to skip the first elements. For
example starting from

scala> sc.parallelize(100 to 120, 4).zipWithIndex.collect
res12: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3),
(104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11),
(112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18),
(119,19), (120,20))

we can get the 3 first elements starting from the 4th (counting from 0) as

scala> sc.parallelize(100 to 120, 4).zipWithIndex.filter(_._2 >=4).take(3)
res14: Array[(Int, Long)] = Array((104,4), (105,5), (106,6))

Hope that helps


2015-09-02 8:52 GMT+02:00 Hemant Bhanawat <he...@gmail.com>:

> I think rdd.toLocalIterator is what you want. But it will keep one
> partition's data in-memory.
>
> On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera <ni...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have a large set of data which would not fit into the memory. So, I wan
>> to take n number of data from the RDD given a particular index. for an
>> example, take 1000 rows starting from the index 1001.
>>
>> I see that there is a  take(num: Int): Array[T] method in the RDD, but it
>> only returns the 'first n number of rows'.
>>
>> the simplest use case of this, requirement is, say, I write a custom
>> relation provider with a custom relation extending the InsertableRelation.
>>
>> say I submit this query,
>> "insert into table abc select * from xyz sort by x asc"
>>
>> in my custom relation, I have implemented the def insert(data: DataFrame,
>> overwrite: Boolean): Unit
>> method. here, since the data is large, I can not call methods such as
>> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
>> As you could see, the resultant DF from the "select * from xyz sort by x
>> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
>> insert method, this sorted order would be affected, since the inserting
>> operation would be done in parallel in each partition.
>>
>> in order to handle this, my initial idea was to take rows from the RDD in
>> batches and do the insert operation, and for that I was looking for a
>> method to take n number of rows starting from a given index.
>>
>> is there any better way to handle this, in RDDs?
>>
>> your assistance in this regard is highly appreciated.
>>
>> cheers
>>
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> https://pythagoreanscript.wordpress.com/
>>
>
>

Re: taking an n number of rows from and RDD starting from an index

Posted by Hemant Bhanawat <he...@gmail.com>.
I think rdd.toLocalIterator is what you want. But it will keep one
partition's data in-memory.

On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera <ni...@gmail.com>
wrote:

> Hi all,
>
> I have a large set of data which would not fit into the memory. So, I wan
> to take n number of data from the RDD given a particular index. for an
> example, take 1000 rows starting from the index 1001.
>
> I see that there is a  take(num: Int): Array[T] method in the RDD, but it
> only returns the 'first n number of rows'.
>
> the simplest use case of this, requirement is, say, I write a custom
> relation provider with a custom relation extending the InsertableRelation.
>
> say I submit this query,
> "insert into table abc select * from xyz sort by x asc"
>
> in my custom relation, I have implemented the def insert(data: DataFrame,
> overwrite: Boolean): Unit
> method. here, since the data is large, I can not call methods such as
> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
> As you could see, the resultant DF from the "select * from xyz sort by x
> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
> insert method, this sorted order would be affected, since the inserting
> operation would be done in parallel in each partition.
>
> in order to handle this, my initial idea was to take rows from the RDD in
> batches and do the insert operation, and for that I was looking for a
> method to take n number of rows starting from a given index.
>
> is there any better way to handle this, in RDDs?
>
> your assistance in this regard is highly appreciated.
>
> cheers
>
> --
> Niranda
> @n1r44 <https://twitter.com/N1R44>
> https://pythagoreanscript.wordpress.com/
>