You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Omid Alipourfard <ec...@gmail.com> on 2016/07/07 03:55:30 UTC

Question regarding structured data and partitions

Hi,

Why doesn't Spark keep information about the structure of the RDDs or the
partitions within RDDs?   Say that I use repartitionAndSortWithinPartitions,
which results in sorted partitions.  With sorted partitions, lookups should
be super fast (binary search?), yet I still need to go through the whole
partition to perform a lookup -- using say, filter.

To give more context into a use case, let me give a very simple example
where having this feature seems extremely useful: consider that you have a
stream of incoming keys, where for each key you need to lookup the
associated value in a large RDD and perform operations on the values.
Right now, performing a join between the RDDs in the DStream and the large
RDD seems to be the way to go.  I.e.:

incomingData.transform { rdd => largeRdd.join(rdd) }
  .map(performAdditionalOperations).save(...)

Assuming that the largeRdd is sorted/or contains an index and each window
of incomingData is small, this join operation can be performed in
*O(incomingData
* (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is much
more expensive than that.

I have just started using Spark, so it's highly likely that I am using it
wrong.  So any thoughts are appreciated!

TL;DR.  Why not keep an index/info with each partition or RDD to speed up
operations such as lookups filters, etc.?

Thanks,
Omid

Re: Question regarding structured data and partitions

Posted by tan shai <ta...@gmail.com>.
Thank you for your answer.

Since Spark 1.6.0, it is possible to partition a dataframe using hash
partitioning with Repartition "
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
"
I have also sorted a dataframe and it using a range partitioning in the
physical plan.

So, I need to retrieve partition informations obtained with the sorted
function.

Any ideas??

2016-07-07 16:59 GMT+02:00 Koert Kuipers <ko...@tresata.com>:

> since dataframes represent more or less a plan of execution, they do not
> have partition information as such i think?
> you could however do dataFrame.rdd, to force it to create a physical plan
> that results in an actual rdd, and then query the rdd for partition info.
>
> On Thu, Jul 7, 2016 at 4:24 AM, tan shai <ta...@gmail.com> wrote:
>
>> Using partitioning with dataframes, how can we retrieve informations
>> about partitions? partitions bounds for example
>>
>> Thanks,
>> Shaira
>>
>> 2016-07-07 6:30 GMT+02:00 Koert Kuipers <ko...@tresata.com>:
>>
>>> spark does keep some information on the partitions of an RDD, namely the
>>> partitioning/partitioner.
>>>
>>> GroupSorted is an extension for key-value RDDs that also keeps track of
>>> the ordering, allowing for faster joins, non-reduce type operations on very
>>> large groups of values per key, etc.
>>> see here:
>>> https://github.com/tresata/spark-sorted
>>> however no support for streaming (yet)...
>>>
>>>
>>> On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard <ec...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Why doesn't Spark keep information about the structure of the RDDs or
>>>> the partitions within RDDs?   Say that I use
>>>> repartitionAndSortWithinPartitions, which results in sorted
>>>> partitions.  With sorted partitions, lookups should be super fast (binary
>>>> search?), yet I still need to go through the whole partition to perform a
>>>> lookup -- using say, filter.
>>>>
>>>> To give more context into a use case, let me give a very simple example
>>>> where having this feature seems extremely useful: consider that you have a
>>>> stream of incoming keys, where for each key you need to lookup the
>>>> associated value in a large RDD and perform operations on the values.
>>>> Right now, performing a join between the RDDs in the DStream and the large
>>>> RDD seems to be the way to go.  I.e.:
>>>>
>>>> incomingData.transform { rdd => largeRdd.join(rdd) }
>>>>   .map(performAdditionalOperations).save(...)
>>>>
>>>> Assuming that the largeRdd is sorted/or contains an index and each
>>>> window of incomingData is small, this join operation can be performed in *O(incomingData
>>>> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
>>>> much more expensive than that.
>>>>
>>>> I have just started using Spark, so it's highly likely that I am using
>>>> it wrong.  So any thoughts are appreciated!
>>>>
>>>> TL;DR.  Why not keep an index/info with each partition or RDD to speed
>>>> up operations such as lookups filters, etc.?
>>>>
>>>> Thanks,
>>>> Omid
>>>>
>>>
>>>
>>
>

Re: Question regarding structured data and partitions

Posted by Koert Kuipers <ko...@tresata.com>.
since dataframes represent more or less a plan of execution, they do not
have partition information as such i think?
you could however do dataFrame.rdd, to force it to create a physical plan
that results in an actual rdd, and then query the rdd for partition info.

On Thu, Jul 7, 2016 at 4:24 AM, tan shai <ta...@gmail.com> wrote:

> Using partitioning with dataframes, how can we retrieve informations about
> partitions? partitions bounds for example
>
> Thanks,
> Shaira
>
> 2016-07-07 6:30 GMT+02:00 Koert Kuipers <ko...@tresata.com>:
>
>> spark does keep some information on the partitions of an RDD, namely the
>> partitioning/partitioner.
>>
>> GroupSorted is an extension for key-value RDDs that also keeps track of
>> the ordering, allowing for faster joins, non-reduce type operations on very
>> large groups of values per key, etc.
>> see here:
>> https://github.com/tresata/spark-sorted
>> however no support for streaming (yet)...
>>
>>
>> On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard <ec...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Why doesn't Spark keep information about the structure of the RDDs or
>>> the partitions within RDDs?   Say that I use
>>> repartitionAndSortWithinPartitions, which results in sorted
>>> partitions.  With sorted partitions, lookups should be super fast (binary
>>> search?), yet I still need to go through the whole partition to perform a
>>> lookup -- using say, filter.
>>>
>>> To give more context into a use case, let me give a very simple example
>>> where having this feature seems extremely useful: consider that you have a
>>> stream of incoming keys, where for each key you need to lookup the
>>> associated value in a large RDD and perform operations on the values.
>>> Right now, performing a join between the RDDs in the DStream and the large
>>> RDD seems to be the way to go.  I.e.:
>>>
>>> incomingData.transform { rdd => largeRdd.join(rdd) }
>>>   .map(performAdditionalOperations).save(...)
>>>
>>> Assuming that the largeRdd is sorted/or contains an index and each
>>> window of incomingData is small, this join operation can be performed in *O(incomingData
>>> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
>>> much more expensive than that.
>>>
>>> I have just started using Spark, so it's highly likely that I am using
>>> it wrong.  So any thoughts are appreciated!
>>>
>>> TL;DR.  Why not keep an index/info with each partition or RDD to speed
>>> up operations such as lookups filters, etc.?
>>>
>>> Thanks,
>>> Omid
>>>
>>
>>
>

Re: Question regarding structured data and partitions

Posted by tan shai <ta...@gmail.com>.
Using partitioning with dataframes, how can we retrieve informations about
partitions? partitions bounds for example

Thanks,
Shaira

2016-07-07 6:30 GMT+02:00 Koert Kuipers <ko...@tresata.com>:

> spark does keep some information on the partitions of an RDD, namely the
> partitioning/partitioner.
>
> GroupSorted is an extension for key-value RDDs that also keeps track of
> the ordering, allowing for faster joins, non-reduce type operations on very
> large groups of values per key, etc.
> see here:
> https://github.com/tresata/spark-sorted
> however no support for streaming (yet)...
>
>
> On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard <ec...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Why doesn't Spark keep information about the structure of the RDDs or the
>> partitions within RDDs?   Say that I use
>> repartitionAndSortWithinPartitions, which results in sorted partitions.
>> With sorted partitions, lookups should be super fast (binary search?), yet
>> I still need to go through the whole partition to perform a lookup -- using
>> say, filter.
>>
>> To give more context into a use case, let me give a very simple example
>> where having this feature seems extremely useful: consider that you have a
>> stream of incoming keys, where for each key you need to lookup the
>> associated value in a large RDD and perform operations on the values.
>> Right now, performing a join between the RDDs in the DStream and the large
>> RDD seems to be the way to go.  I.e.:
>>
>> incomingData.transform { rdd => largeRdd.join(rdd) }
>>   .map(performAdditionalOperations).save(...)
>>
>> Assuming that the largeRdd is sorted/or contains an index and each window
>> of incomingData is small, this join operation can be performed in *O(incomingData
>> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
>> much more expensive than that.
>>
>> I have just started using Spark, so it's highly likely that I am using it
>> wrong.  So any thoughts are appreciated!
>>
>> TL;DR.  Why not keep an index/info with each partition or RDD to speed up
>> operations such as lookups filters, etc.?
>>
>> Thanks,
>> Omid
>>
>
>

Re: Question regarding structured data and partitions

Posted by Koert Kuipers <ko...@tresata.com>.
spark does keep some information on the partitions of an RDD, namely the
partitioning/partitioner.

GroupSorted is an extension for key-value RDDs that also keeps track of the
ordering, allowing for faster joins, non-reduce type operations on very
large groups of values per key, etc.
see here:
https://github.com/tresata/spark-sorted
however no support for streaming (yet)...


On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard <ec...@gmail.com> wrote:

> Hi,
>
> Why doesn't Spark keep information about the structure of the RDDs or the
> partitions within RDDs?   Say that I use
> repartitionAndSortWithinPartitions, which results in sorted partitions.
> With sorted partitions, lookups should be super fast (binary search?), yet
> I still need to go through the whole partition to perform a lookup -- using
> say, filter.
>
> To give more context into a use case, let me give a very simple example
> where having this feature seems extremely useful: consider that you have a
> stream of incoming keys, where for each key you need to lookup the
> associated value in a large RDD and perform operations on the values.
> Right now, performing a join between the RDDs in the DStream and the large
> RDD seems to be the way to go.  I.e.:
>
> incomingData.transform { rdd => largeRdd.join(rdd) }
>   .map(performAdditionalOperations).save(...)
>
> Assuming that the largeRdd is sorted/or contains an index and each window
> of incomingData is small, this join operation can be performed in *O(incomingData
> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
> much more expensive than that.
>
> I have just started using Spark, so it's highly likely that I am using it
> wrong.  So any thoughts are appreciated!
>
> TL;DR.  Why not keep an index/info with each partition or RDD to speed up
> operations such as lookups filters, etc.?
>
> Thanks,
> Omid
>