You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Eugene Cheipesh <ec...@gmail.com> on 2014/07/24 17:40:58 UTC

pre-filtered hadoop RDD use case

Hello,

I have an interesting use case for a pre-filtered RDD. I have two solutions
that I am not entirly happy with and would like to get some feedback and
thoughts. Perhaps it is a use case that could be more explicitly supported
in Spark.

My data has well defined semantics for they key values that I can use to
pre-filter an RDD to exclude those partitions and records that I will not
need from being loaded at all. In most cases this is significant savings.

Essentially the dataset is geographic image tiles, as you would see on
google maps. The entire dataset could be huge, covering an entire continent
at high resolution. But if I want to work with a subset, lets say a single
city, it makes no sense for me to load all the partitions into memory just
so I can filter them as a first step.

First attempt was to extent NewHadoopRDD as follows:

abstract class PreFilteredHadoopRDD[K, V](
    sc : SparkContext,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    @transient conf: Configuration)
  extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf)
{
  /** returns true if specific partition has relevant keys */
  def includePartition(p: Partition): Boolean

  /** returns true if the specific key in the partition passes the filter */
  def includeKey(key: K): Boolean

  override def getPartitions: Array[Partition] = {
    val partitions = super.getPartitions
    partitions.filter(includePartition)
  }

  override def compute(theSplit: Partition, context: TaskContext) = {
    val ii = super.compute(theSplit, context)
    new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) =>
includeKey(k)})
  }
} 

NewHadoopRDD for reference:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

This is nice and handles the partition portion of the issue well enough, but
by the time the iterator is created by super.compute there is no way avoid
reading the values from records that do not pass my filter. 

Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can
do better, and avoid deserializing the values if I could get my hands on the
reader and re-implement compute(). But this does not seem possible to do
through extension because both the NewHadooprRDD.confBroadcast and
NewHadoopPartition are private. There  does not seem to be a choice but to
copy/paste extend the NewHadoopRDD.

The two solutions that are apparent are:
1. remove those private modifiers
2. factor out reader creation to a method that can be used to reimplement
compute() in a sub-class

I would be curious to hear if anybody had/has similar problem and any
thoughts on the issue. If you think there is PR in this I’d be happy to code
it up and submit it.


Thank you
--
Eugene Cheipesh



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: pre-filtered hadoop RDD use case

Posted by "Yan Zhou.sc" <Ya...@huawei.com>.
Hi Reynold,

I agree that we should not hurry right now to modify/enhance APIs and could be satisfied with extending existing ones as much as possible. On the other hand, more intelligent data stores like HBase or Cassendra do support
 complex pushdowns, often more complex than their MR interfaces can now support. Given the increasing popularity of those stores, it will probably make sense to factor out some common behavior to put it in RDD. SchemaRDD is good for structured, tabular data. But RDD is more generic.

Again, this goal can be long-term and I did not mean to be in hurry. It just occurred to me that without extra processing info there is no way to avoid "reimplementing" the "compute" method of the NewHadoopRDD if the reader can't be changed; which brought me to the "bigger" picture.

Thanks and regards,

Yan


-----Original Message-----
From: Reynold Xin [mailto:rxin@databricks.com] 
Sent: Tuesday, July 29, 2014 11:44 AM
To: dev@spark.apache.org
Subject: Re: pre-filtered hadoop RDD use case

I am not sure if I agree that it lacks the mechanism to do pushdowns.

Hadoop InputFormat itself provides some basic mechanism to push down predicates already. The HBase InputFormat already implements it. In Spark, you can also run arbitrary user code, and you can decide what to do. You can also just subclass RDD to deal with arbitrary input sources. In the future, we will build a more standard API to interface with external stores in SchemaRDD.

The topic of discussion here is whether Eugene can reuse as much of HadoopRDD/NewHadoopRDD as possible. We can certainly make the HadoopRDD interface more pluggable, but that'd require opening up the internals of that class and stabilize the API. I am not sure if it is something we'd want to do in a hurry, because there is a clear workaround right now (subclass RDD) and it is very hard to change that once the project is committed to that API.






On Tue, Jul 29, 2014 at 11:35 AM, Yan Zhou.sc <Ya...@huawei.com>
wrote:

> PartitionPruningRDD.scala still only handles, as said, the partition 
> portion of the issue.
>
> On the "record pruning" portion, although cheap fixes could be 
> available for this issue as reported, but I believe a fundamental 
> issue is lack of a mechanism of processing merging/pushdown.
> Given the popularity of columnar, and even more intelligent, data 
> stores, it makes sense to support some "post processing" before a RDD is formed.
> This "post processing" could be performed by the RDD itself in 
> compute(); or it could be performed by some data store which supports such pushdowns.
> In the later case, such processing info should be made available to 
> the data store RDD to pass on to the stores.
>
> For instance, FilteredRDD requires the parent to materialize the 
> record fully before it can start its own processing. If it could be 
> "merged" with its parent, a much smaller RDD footprint would result.
>
> "Pipelined execution" is mentioned for NarrowDependency in code, but 
> no implementation seems to be in place, and more optimization is 
> desired beyond just record-oriented execution pipelining.
>
>
>
> -----Original Message-----
> From: Reynold Xin [mailto:rxin@databricks.com]
> Sent: Tuesday, July 29, 2014 12:55 AM
> To: dev@spark.apache.org
> Subject: Re: pre-filtered hadoop RDD use case
>
> Would something like this help?
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/ap
> ache/spark/rdd/PartitionPruningRDD.scala
>
>
>
>
> On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <ec...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I have an interesting use case for a pre-filtered RDD. I have two 
> > solutions that I am not entirly happy with and would like to get 
> > some feedback and thoughts. Perhaps it is a use case that could be 
> > more explicitly supported in Spark.
> >
> > My data has well defined semantics for they key values that I can 
> > use to pre-filter an RDD to exclude those partitions and records 
> > that I will not need from being loaded at all. In most cases this is
> significant savings.
> >
> > Essentially the dataset is geographic image tiles, as you would see 
> > on google maps. The entire dataset could be huge, covering an entire 
> > continent at high resolution. But if I want to work with a subset, 
> > lets say a single city, it makes no sense for me to load all the 
> > partitions into memory just so I can filter them as a first step.
> >
> > First attempt was to extent NewHadoopRDD as follows:
> >
> > abstract class PreFilteredHadoopRDD[K, V](
> >     sc : SparkContext,
> >     inputFormatClass: Class[_ <: InputFormat[K, V]],
> >     keyClass: Class[K],
> >     valueClass: Class[V],
> >     @transient conf: Configuration)
> >   extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass,
> > conf) {
> >   /** returns true if specific partition has relevant keys */
> >   def includePartition(p: Partition): Boolean
> >
> >   /** returns true if the specific key in the partition passes the 
> > filter */
> >   def includeKey(key: K): Boolean
> >
> >   override def getPartitions: Array[Partition] = {
> >     val partitions = super.getPartitions
> >     partitions.filter(includePartition)
> >   }
> >
> >   override def compute(theSplit: Partition, context: TaskContext) = {
> >     val ii = super.compute(theSplit, context)
> >     new InterruptibleIterator(ii.context, ii.delegate.filter{case
> > (k,v) =>
> > includeKey(k)})
> >   }
> > }
> >
> > NewHadoopRDD for reference:
> >
> > https://github.com/apache/spark/blob/master/core/src/main/scala/org/
> > ap
> > ache/spark/rdd/NewHadoopRDD.scala
> >
> > This is nice and handles the partition portion of the issue well 
> > enough, but by the time the iterator is created by super.compute 
> > there is no way avoid reading the values from records that do not 
> > pass my filter.
> >
> > Since I am actually using ‘SequenceFileInputFormat’ as my 
> > InputFormat I can do better, and avoid deserializing the values if I 
> > could get my hands on the reader and re-implement compute(). But 
> > this does not seem possible to do through extension because both the 
> > NewHadooprRDD.confBroadcast and NewHadoopPartition are private. 
> > There does not seem to be a choice but to copy/paste extend the 
> > NewHadoopRDD.
> >
> > The two solutions that are apparent are:
> > 1. remove those private modifiers
> > 2. factor out reader creation to a method that can be used to 
> > reimplement
> > compute() in a sub-class
> >
> > I would be curious to hear if anybody had/has similar problem and 
> > any thoughts on the issue. If you think there is PR in this I’d be 
> > happy to code it up and submit it.
> >
> >
> > Thank you
> > --
> > Eugene Cheipesh
> >
> >
> >
> > --
> > View this message in context:
> > http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filter
> > ed -hadoop-RDD-use-case-tp7484.html Sent from the Apache Spark 
> > Developers List mailing list archive at Nabble.com.
> >
>

Re: pre-filtered hadoop RDD use case

Posted by Reynold Xin <rx...@databricks.com>.
I am not sure if I agree that it lacks the mechanism to do pushdowns.

Hadoop InputFormat itself provides some basic mechanism to push down
predicates already. The HBase InputFormat already implements it. In Spark,
you can also run arbitrary user code, and you can decide what to do. You
can also just subclass RDD to deal with arbitrary input sources. In the
future, we will build a more standard API to interface with external stores
in SchemaRDD.

The topic of discussion here is whether Eugene can reuse as much of
HadoopRDD/NewHadoopRDD as possible. We can certainly make the HadoopRDD
interface more pluggable, but that'd require opening up the internals of
that class and stabilize the API. I am not sure if it is something we'd
want to do in a hurry, because there is a clear workaround right now
(subclass RDD) and it is very hard to change that once the project is
committed to that API.






On Tue, Jul 29, 2014 at 11:35 AM, Yan Zhou.sc <Ya...@huawei.com>
wrote:

> PartitionPruningRDD.scala still only handles, as said, the partition
> portion of the issue.
>
> On the "record pruning" portion, although cheap fixes could be available
> for this issue as reported, but I believe a
> fundamental issue is lack of a mechanism of processing merging/pushdown.
> Given the popularity of columnar, and even more intelligent, data stores,
> it makes sense to support some "post processing" before a RDD is formed.
> This "post processing" could be performed by the RDD itself in compute();
> or it could be performed by some data store which supports such pushdowns.
> In the later case, such processing info should be made available to the
> data store RDD to pass on to the stores.
>
> For instance, FilteredRDD requires the parent to materialize the record
> fully before it can start its own processing. If it could be "merged" with
> its parent, a much smaller RDD footprint would result.
>
> "Pipelined execution" is mentioned for NarrowDependency in code, but no
> implementation seems to be in place, and more optimization is desired
> beyond just record-oriented execution pipelining.
>
>
>
> -----Original Message-----
> From: Reynold Xin [mailto:rxin@databricks.com]
> Sent: Tuesday, July 29, 2014 12:55 AM
> To: dev@spark.apache.org
> Subject: Re: pre-filtered hadoop RDD use case
>
> Would something like this help?
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
>
>
>
>
> On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <ec...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I have an interesting use case for a pre-filtered RDD. I have two
> > solutions that I am not entirly happy with and would like to get some
> > feedback and thoughts. Perhaps it is a use case that could be more
> > explicitly supported in Spark.
> >
> > My data has well defined semantics for they key values that I can use
> > to pre-filter an RDD to exclude those partitions and records that I
> > will not need from being loaded at all. In most cases this is
> significant savings.
> >
> > Essentially the dataset is geographic image tiles, as you would see on
> > google maps. The entire dataset could be huge, covering an entire
> > continent at high resolution. But if I want to work with a subset,
> > lets say a single city, it makes no sense for me to load all the
> > partitions into memory just so I can filter them as a first step.
> >
> > First attempt was to extent NewHadoopRDD as follows:
> >
> > abstract class PreFilteredHadoopRDD[K, V](
> >     sc : SparkContext,
> >     inputFormatClass: Class[_ <: InputFormat[K, V]],
> >     keyClass: Class[K],
> >     valueClass: Class[V],
> >     @transient conf: Configuration)
> >   extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass,
> > conf) {
> >   /** returns true if specific partition has relevant keys */
> >   def includePartition(p: Partition): Boolean
> >
> >   /** returns true if the specific key in the partition passes the
> > filter */
> >   def includeKey(key: K): Boolean
> >
> >   override def getPartitions: Array[Partition] = {
> >     val partitions = super.getPartitions
> >     partitions.filter(includePartition)
> >   }
> >
> >   override def compute(theSplit: Partition, context: TaskContext) = {
> >     val ii = super.compute(theSplit, context)
> >     new InterruptibleIterator(ii.context, ii.delegate.filter{case
> > (k,v) =>
> > includeKey(k)})
> >   }
> > }
> >
> > NewHadoopRDD for reference:
> >
> > https://github.com/apache/spark/blob/master/core/src/main/scala/org/ap
> > ache/spark/rdd/NewHadoopRDD.scala
> >
> > This is nice and handles the partition portion of the issue well
> > enough, but by the time the iterator is created by super.compute there
> > is no way avoid reading the values from records that do not pass my
> > filter.
> >
> > Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat
> > I can do better, and avoid deserializing the values if I could get my
> > hands on the reader and re-implement compute(). But this does not seem
> > possible to do through extension because both the
> > NewHadooprRDD.confBroadcast and NewHadoopPartition are private. There
> > does not seem to be a choice but to copy/paste extend the
> > NewHadoopRDD.
> >
> > The two solutions that are apparent are:
> > 1. remove those private modifiers
> > 2. factor out reader creation to a method that can be used to
> > reimplement
> > compute() in a sub-class
> >
> > I would be curious to hear if anybody had/has similar problem and any
> > thoughts on the issue. If you think there is PR in this I’d be happy
> > to code it up and submit it.
> >
> >
> > Thank you
> > --
> > Eugene Cheipesh
> >
> >
> >
> > --
> > View this message in context:
> > http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered
> > -hadoop-RDD-use-case-tp7484.html Sent from the Apache Spark Developers
> > List mailing list archive at Nabble.com.
> >
>

RE: pre-filtered hadoop RDD use case

Posted by "Yan Zhou.sc" <Ya...@huawei.com>.
PartitionPruningRDD.scala still only handles, as said, the partition portion of the issue. 

On the "record pruning" portion, although cheap fixes could be available for this issue as reported, but I believe a
fundamental issue is lack of a mechanism of processing merging/pushdown. Given the popularity of columnar, and even more intelligent, data stores,
it makes sense to support some "post processing" before a RDD is formed. This "post processing" could be performed by the RDD itself in compute(); or it could be performed by some data store which supports such pushdowns. In the later case, such processing info should be made available to the data store RDD to pass on to the stores. 

For instance, FilteredRDD requires the parent to materialize the record fully before it can start its own processing. If it could be "merged" with its parent, a much smaller RDD footprint would result.

"Pipelined execution" is mentioned for NarrowDependency in code, but no implementation seems to be in place, and more optimization is desired beyond just record-oriented execution pipelining.



-----Original Message-----
From: Reynold Xin [mailto:rxin@databricks.com] 
Sent: Tuesday, July 29, 2014 12:55 AM
To: dev@spark.apache.org
Subject: Re: pre-filtered hadoop RDD use case

Would something like this help?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala




On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <ec...@gmail.com>
wrote:

> Hello,
>
> I have an interesting use case for a pre-filtered RDD. I have two 
> solutions that I am not entirly happy with and would like to get some 
> feedback and thoughts. Perhaps it is a use case that could be more 
> explicitly supported in Spark.
>
> My data has well defined semantics for they key values that I can use 
> to pre-filter an RDD to exclude those partitions and records that I 
> will not need from being loaded at all. In most cases this is significant savings.
>
> Essentially the dataset is geographic image tiles, as you would see on 
> google maps. The entire dataset could be huge, covering an entire 
> continent at high resolution. But if I want to work with a subset, 
> lets say a single city, it makes no sense for me to load all the 
> partitions into memory just so I can filter them as a first step.
>
> First attempt was to extent NewHadoopRDD as follows:
>
> abstract class PreFilteredHadoopRDD[K, V](
>     sc : SparkContext,
>     inputFormatClass: Class[_ <: InputFormat[K, V]],
>     keyClass: Class[K],
>     valueClass: Class[V],
>     @transient conf: Configuration)
>   extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, 
> conf) {
>   /** returns true if specific partition has relevant keys */
>   def includePartition(p: Partition): Boolean
>
>   /** returns true if the specific key in the partition passes the 
> filter */
>   def includeKey(key: K): Boolean
>
>   override def getPartitions: Array[Partition] = {
>     val partitions = super.getPartitions
>     partitions.filter(includePartition)
>   }
>
>   override def compute(theSplit: Partition, context: TaskContext) = {
>     val ii = super.compute(theSplit, context)
>     new InterruptibleIterator(ii.context, ii.delegate.filter{case 
> (k,v) =>
> includeKey(k)})
>   }
> }
>
> NewHadoopRDD for reference:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/ap
> ache/spark/rdd/NewHadoopRDD.scala
>
> This is nice and handles the partition portion of the issue well 
> enough, but by the time the iterator is created by super.compute there 
> is no way avoid reading the values from records that do not pass my 
> filter.
>
> Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat 
> I can do better, and avoid deserializing the values if I could get my 
> hands on the reader and re-implement compute(). But this does not seem 
> possible to do through extension because both the 
> NewHadooprRDD.confBroadcast and NewHadoopPartition are private. There  
> does not seem to be a choice but to copy/paste extend the 
> NewHadoopRDD.
>
> The two solutions that are apparent are:
> 1. remove those private modifiers
> 2. factor out reader creation to a method that can be used to 
> reimplement
> compute() in a sub-class
>
> I would be curious to hear if anybody had/has similar problem and any 
> thoughts on the issue. If you think there is PR in this I’d be happy 
> to code it up and submit it.
>
>
> Thank you
> --
> Eugene Cheipesh
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered
> -hadoop-RDD-use-case-tp7484.html Sent from the Apache Spark Developers 
> List mailing list archive at Nabble.com.
>

Re: pre-filtered hadoop RDD use case

Posted by Reynold Xin <rx...@databricks.com>.
Would something like this help?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala




On Thu, Jul 24, 2014 at 8:40 AM, Eugene Cheipesh <ec...@gmail.com>
wrote:

> Hello,
>
> I have an interesting use case for a pre-filtered RDD. I have two solutions
> that I am not entirly happy with and would like to get some feedback and
> thoughts. Perhaps it is a use case that could be more explicitly supported
> in Spark.
>
> My data has well defined semantics for they key values that I can use to
> pre-filter an RDD to exclude those partitions and records that I will not
> need from being loaded at all. In most cases this is significant savings.
>
> Essentially the dataset is geographic image tiles, as you would see on
> google maps. The entire dataset could be huge, covering an entire continent
> at high resolution. But if I want to work with a subset, lets say a single
> city, it makes no sense for me to load all the partitions into memory just
> so I can filter them as a first step.
>
> First attempt was to extent NewHadoopRDD as follows:
>
> abstract class PreFilteredHadoopRDD[K, V](
>     sc : SparkContext,
>     inputFormatClass: Class[_ <: InputFormat[K, V]],
>     keyClass: Class[K],
>     valueClass: Class[V],
>     @transient conf: Configuration)
>   extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf)
> {
>   /** returns true if specific partition has relevant keys */
>   def includePartition(p: Partition): Boolean
>
>   /** returns true if the specific key in the partition passes the filter
> */
>   def includeKey(key: K): Boolean
>
>   override def getPartitions: Array[Partition] = {
>     val partitions = super.getPartitions
>     partitions.filter(includePartition)
>   }
>
>   override def compute(theSplit: Partition, context: TaskContext) = {
>     val ii = super.compute(theSplit, context)
>     new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) =>
> includeKey(k)})
>   }
> }
>
> NewHadoopRDD for reference:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
>
> This is nice and handles the partition portion of the issue well enough,
> but
> by the time the iterator is created by super.compute there is no way avoid
> reading the values from records that do not pass my filter.
>
> Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can
> do better, and avoid deserializing the values if I could get my hands on
> the
> reader and re-implement compute(). But this does not seem possible to do
> through extension because both the NewHadooprRDD.confBroadcast and
> NewHadoopPartition are private. There  does not seem to be a choice but to
> copy/paste extend the NewHadoopRDD.
>
> The two solutions that are apparent are:
> 1. remove those private modifiers
> 2. factor out reader creation to a method that can be used to reimplement
> compute() in a sub-class
>
> I would be curious to hear if anybody had/has similar problem and any
> thoughts on the issue. If you think there is PR in this I’d be happy to
> code
> it up and submit it.
>
>
> Thank you
> --
> Eugene Cheipesh
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>