You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by nsareen <ns...@gmail.com> on 2014/12/02 15:29:46 UTC

Does filter on an RDD scan every data item ?

Hi ,

I wanted some clarity into the functioning of Filter function of RDD.

1) Does filter function scan every element saved in RDD? if my RDD
represents 10 Million rows, and if i want to work on only 1000 of them, is
there an efficient way of filtering the subset without having to scan every
element ?

2) If my RDD represents a Key / Value data set. When i filter this data set
of 10 Million rows, can i specify that the search should be restricted to
only partitions which contain specific keys ? Will spark run by filter
operation on all partitions if the partitions are done by key, irrespective
the key exists in a partition or not ? 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nsareen <ns...@gmail.com>.
Any thoughts, how could Spark SQL help in our scenario ?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by 诺铁 <no...@gmail.com>.
there is a

*PartitionPruningRDD*

:: DeveloperApi :: A RDD used to prune RDD partitions/partitions so we can
avoid launching tasks on all partitions. An example use case: If we know
the RDD is partitioned by range, and the execution DAG has a filter on the
key, we can avoid launching tasks on partitions that don't have the range
covering the key.

seems exactly made for the case,  but it's marked as DeveloperApi, anyone
know how to use it?



On Mon, Dec 8, 2014 at 11:31 AM, nsareen <ns...@gmail.com> wrote:

> @Sowen, would appreciate, if you can explain how would Spark SQL help in my
> scenario..
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20571.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Does filter on an RDD scan every data item ?

Posted by nsareen <ns...@gmail.com>.
@Sowen, would appreciate, if you can explain how would Spark SQL help in my
scenario..



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20571.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nsareen <ns...@gmail.com>.
Thanks! shall try it out.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by dsiegel <de...@gmail.com>.
Also, you may want to use .lookup() instead of .filter()

def
lookup(key: K): Seq[V]
Return the list of values in the RDD for key key. This operation is done
efficiently if the RDD has a known partitioner by only searching the
partition that the key maps to.

You might want to partition your first batch of data with .partitionBy()
using your CustomTuple hash implementation, persist it, and do not run any
operations on it which can remove it's partitioner object.










--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nsareen <ns...@gmail.com>.
I'm not sure sample is what i was looking for. 

As mentioned in another post above. this is what i'm looking for.

1) My RDD contains this structure. Tuple2<CustomTuple,Double>.
2) Each CustomTuple is a combination of string id's e.g. 
CustomTuple.dimensionOne="AE232323"
CustomTuple.dimensionTwo="BE232323"
CustomTuple.dimensionThree="CE232323"
and so on ---
3) CustomTuple has overridden equals & hash implementation which helps
identify unique objects and equality if values in dimensionOne,Two,Three
match for two distinct objects.
4) Double is a numberic value.
5) I want to create RDD of  50-100Million or more such tuples in Spark,
which can grow over time.
6) My Web Application would request to process a subset of these millions of
rows. The processing is nothing but aggregation / arithmetic functions over
this data set. We felt spark would be the right candidate to process this in
distributed fashion and also would help scalability for future. Where we are
stuck is that, in case the application requests a subset comprising of
100thousand tuples, we would have to construct these many CustomTuple
objects and pass them via Spark Driver program to the filter function, which
in turn would go and scan these 100 million rows to generate the subset. 

I was of the assumption, that since Spark allows Key / Value storage, there
would be some indexing for the Keys stored, which would help spark locate
objects.






--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20366.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by dsiegel <de...@gmail.com>.
also available is .sample(), which will randomly sample your RDD with or
without replacement, and returns an RDD.
.sample() takes a fraction, so it doesn't return an exact number of
elements. 

eg. 
rdd.sample(true, .0001, 1)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20290.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by dsiegel <de...@gmail.com>.
>> nsareen wrote
>>> 1) Does filter function scan every element saved in RDD? if my RDD
>>> represents 10 Million rows, and if i want to work on only 1000 of them,
>>> is
>>> there an efficient way of filtering the subset without having to scan
>>> every element ?

using .take(1000) may be a biased sample. 
you may want to consider sampling your RDD (with or without replacement)
using a seed for randomization, using .takeSample()
eg.
rdd.takeSample(false, 1000, 1)
this returns an Array, from which you could create another RDD.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20289.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nir <ni...@gmail.com>.
Looks like this has been supported from 1.4 release :) 
https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p26049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nir <ni...@gmail.com>.
"I don't think you could avoid this 
in general, right, in any system? "

Really?  nosql databases do efficient lookups(and scan) based on key and
partition. look at cassandra, hbase



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p26048.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by nsareen <ns...@gmail.com>.
Thanks for the reply!

To be honest, I was expecting spark to have some sort of Indexing for keys,
which would help it locate the keys efficiently.

I wasn't using Spark SQL here, but if it helps perform this efficiently, i
can try it out, can you please elaborate, how will it be helpful in this
scenario ?

Thanks,
Nitin.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20365.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by Sean Owen <so...@cloudera.com>.
take(1000) merely takes the first 1000 elements of an RDD. I don't
imagine that's what the OP means. filter() is how you select a subset
of elements to work with. Yes, this requires evaluating the predicate
on all 10M elements, at least once. I don't think you could avoid this
in general, right, in any system?

You might be able to take advantage of additional info you have. For
example if you have a particular partitioning system and you know that
elements of interest are only in one partition, you could create a
more efficient version with mapPartitions that simply drops other
partitions.

Same answer to your second question. It sounds like you expect that
Spark somehow has an index over keys, but it does not. It has no
special notion of where your keys are or what they are.

This changes a bit if you mean you are using the SQL APIs, but it
doesn't sound like you are.

On Tue, Dec 2, 2014 at 10:17 AM, Gen <ge...@gmail.com> wrote:
> Hi,
>
> For your first question, I think that we can use
> /sc.parallelize(rdd.take(1000))/
>
> For your second question, I am not sure. But I don't think that we can
> restricted filter within certain partition without scan every element.
>
> Cheers
> Gen
>
>
> nsareen wrote
>> Hi ,
>>
>> I wanted some clarity into the functioning of Filter function of RDD.
>>
>> 1) Does filter function scan every element saved in RDD? if my RDD
>> represents 10 Million rows, and if i want to work on only 1000 of them, is
>> there an efficient way of filtering the subset without having to scan
>> every element ?
>>
>> 2) If my RDD represents a Key / Value data set. When i filter this data
>> set of 10 Million rows, can i specify that the search should be restricted
>> to only partitions which contain specific keys ? Will spark run by filter
>> operation on all partitions if the partitions are done by key,
>> irrespective the key exists in a partition or not ?
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20174.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Does filter on an RDD scan every data item ?

Posted by Gen <ge...@gmail.com>.
Hi,

For your first question, I think that we can use
/sc.parallelize(rdd.take(1000))/

For your second question, I am not sure. But I don't think that we can
restricted filter within certain partition without scan every element.

Cheers
Gen


nsareen wrote
> Hi ,
> 
> I wanted some clarity into the functioning of Filter function of RDD.
> 
> 1) Does filter function scan every element saved in RDD? if my RDD
> represents 10 Million rows, and if i want to work on only 1000 of them, is
> there an efficient way of filtering the subset without having to scan
> every element ?
> 
> 2) If my RDD represents a Key / Value data set. When i filter this data
> set of 10 Million rows, can i specify that the search should be restricted
> to only partitions which contain specific keys ? Will spark run by filter
> operation on all partitions if the partitions are done by key,
> irrespective the key exists in a partition or not ?





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org