You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Cristian O <cr...@googlemail.com> on 2015/11/11 13:59:35 UTC

Support for local disk columnar storage for DataFrames

Hi,

I was wondering if there's any planned support for local disk columnar
storage.

This could be an extension of the in-memory columnar store, or possibly
something similar to the recently added local checkpointing for RDDs

This could also have the added benefit of enabling iterative usage for
DataFrames by pruning the query plan through local checkpoints.

A further enhancement would be to add update support to the columnar format
(in the immutable copy-on-write sense of course), by maintaining references
to unchanged row blocks and only copying and mutating the ones that have
changed.

A use case here is streaming and merging updates in a large dataset that
can be efficiently stored internally in a columnar format, rather than
accessing a more inefficient external  data store like HDFS or Cassandra.

Thanks,
Cristian

Re: Support for local disk columnar storage for DataFrames

Posted by Andrew Duffy <an...@gmail.com>.
Relevant link:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files

On Wed, Nov 11, 2015 at 7:31 PM, Reynold Xin <rx...@databricks.com> wrote:

> Thanks for the email. Can you explain what the difference is between this
> and existing formats such as Parquet/ORC?
>
>
> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
> cristian.b.opris@googlemail.com> wrote:
>
>> Hi,
>>
>> I was wondering if there's any planned support for local disk columnar
>> storage.
>>
>> This could be an extension of the in-memory columnar store, or possibly
>> something similar to the recently added local checkpointing for RDDs
>>
>> This could also have the added benefit of enabling iterative usage for
>> DataFrames by pruning the query plan through local checkpoints.
>>
>> A further enhancement would be to add update support to the columnar
>> format (in the immutable copy-on-write sense of course), by maintaining
>> references to unchanged row blocks and only copying and mutating the ones
>> that have changed.
>>
>> A use case here is streaming and merging updates in a large dataset that
>> can be efficiently stored internally in a columnar format, rather than
>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>
>> Thanks,
>> Cristian
>>
>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Cristian O <cr...@googlemail.com>.
Raised this for checkpointing, hopefully it gets some priority as it's very
useful and relatively straightforward to implement ?

https://issues.apache.org/jira/browse/SPARK-11879

On 18 November 2015 at 16:31, Cristian O <cr...@googlemail.com>
wrote:

> Hi,
>
> While these OSS efforts are interesting, they're for now quite unproven.
> Personally would be much more interested in seeing Spark incrementally
> moving towards supporting updating DataFrames on various storage
> substrates, and first of all locally, perhaps as an extension of cached
> DataFrames.
>
> However before we get full blown update support, I would suggest two
> enhancements that are fairly straightforward with the current design. If
> they make sense please let me know and I'll add them as Jiras:
>
> 1. Checkpoint support for DataFrames - as mentioned this can be as simple
> as saving to a parquet file or some other format, but would not require
> re-reading the file to alter the lineage, and would also prune the logical
> plan. Alternatively checkpointing a cached DataFrame can delegate to
> checkpointing the underlying RDD but again needs to prune the logical plan.
>
> 2. Efficient transformation of cached DataFrames to cached DataFrames - an
> efficient copy-on-write mechanism can be used to avoid unpacking
> CachedBatches (row groups) into InternalRows when building a cached
> DataFrame out of a source cached DataFrame through transformations (like an
> outer join) that only affect a small subset of rows. Statistics and
> partitioning information can be used to determine which row groups are
> affected and which can be copied *by reference* unchanged. This would
> effectively allow performing immutable updates of cached DataFrames in
> scenarios like Streaming or other iterative use cases like ML.
>
> Thanks,
> Cristian
>
>
>
> On 16 November 2015 at 08:30, Mark Hamstra <ma...@clearstorydata.com>
> wrote:
>
>> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>>
>> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>>> input/output format support:
>>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>>
>>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> This (updates) is something we are going to think about in the next
>>>> release or two.
>>>>
>>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>>> cristian.b.opris@googlemail.com> wrote:
>>>>
>>>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>>>> example.
>>>>>
>>>>> Basically I want to conceptually do this:
>>>>>
>>>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 1)).toDF("k", "v")
>>>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => (i, 1)).toDF("k", "v")
>>>>>
>>>>> bigDf.cache()
>>>>>
>>>>> bigDf.registerTempTable("big")
>>>>> deltaDf.registerTempTable("delta")
>>>>>
>>>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>>>
>>>>> newBigDf.cache()
>>>>> bigDf.unpersist()
>>>>>
>>>>>
>>>>> This is essentially an update of keys "1" and "50000" only, in a
>>>>> dataset of 1 million keys.
>>>>>
>>>>> This can be achieved efficiently if the join would preserve the cached
>>>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>>>> blocks corresponding to the matching join keys.
>>>>>
>>>>> Statistics can determine which blocks actually need mutating. Note
>>>>> also that shuffling is not required assuming both dataframes are
>>>>> pre-partitioned by the same key K.
>>>>>
>>>>> In SQL this could actually be expressed as an UPDATE statement or for
>>>>> a more generalized use as a MERGE UPDATE:
>>>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>>>
>>>>> While this may seem like a very special case optimization, it would
>>>>> effectively implement UPDATE support for cached DataFrames, for both
>>>>> optimal and non-optimal usage.
>>>>>
>>>>> I appreciate there's quite a lot here, so thank you for taking the
>>>>> time to consider it.
>>>>>
>>>>> Cristian
>>>>>
>>>>>
>>>>>
>>>>> On 12 November 2015 at 15:49, Cristian O <
>>>>> cristian.b.opris@googlemail.com> wrote:
>>>>>
>>>>>> Hi Reynold,
>>>>>>
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> Parquet may very well be used as the underlying implementation, but
>>>>>> this is more than about a particular storage representation.
>>>>>>
>>>>>> There are a few things here that are inter-related and open different
>>>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>>>
>>>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>>>> parquet, just using that as a checkpoint would currently require explicitly
>>>>>> reading it back. A proper checkpoint implementation would just save
>>>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>>>> continue using the same DF, now backed by the checkpoint.
>>>>>>
>>>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>>>> that may arise from unbounded expansion with iterative use-cases, like this
>>>>>> one I encountered recently:
>>>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>>>
>>>>>> But really what I'm after here is:
>>>>>>
>>>>>> 2. Efficient updating of cached DataFrames - The main use case here
>>>>>> is keeping a relatively large dataset cached and updating it iteratively
>>>>>> from streaming. For example one would like to perform ad-hoc queries on an
>>>>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>>>>> an increasingly common use case. Note that the dataset may require merging
>>>>>> (like adding) or overrriding values by key, so simply appending is not
>>>>>> sufficient.
>>>>>>
>>>>>> This is very similar in concept with updateStateByKey for regular
>>>>>> RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at
>>>>>> CachedBatch level  (the row blocks for the columnar representation).
>>>>>>
>>>>>> This can be currently simulated with UNION or (OUTER) JOINs however
>>>>>> is very inefficient as it requires copying and recaching the entire
>>>>>> dataset, and unpersisting the original one. There are also the
>>>>>> aforementioned problems with unbounded logical plans (physical plans are
>>>>>> fine)
>>>>>>
>>>>>> These two together, checkpointing and updating cached DataFrames,
>>>>>> would give fault-tolerant efficient updating of DataFrames, meaning
>>>>>> streaming apps can take advantage of the compact columnar representation
>>>>>> and Tungsten optimisations.
>>>>>>
>>>>>> I'm not quite sure if something like this can be achieved by other
>>>>>> means or has been investigated before, hence why I'm looking for feedback
>>>>>> here.
>>>>>>
>>>>>> While one could use external data stores, they would have the added
>>>>>> IO penalty, plus most of what's available at the moment is either HDFS
>>>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>>>> space overhead over columnar formats.
>>>>>>
>>>>>> Thanks,
>>>>>> Cristian
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the email. Can you explain what the difference is between
>>>>>>> this and existing formats such as Parquet/ORC?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>>>> cristian.b.opris@googlemail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I was wondering if there's any planned support for local disk
>>>>>>>> columnar storage.
>>>>>>>>
>>>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>>>> possibly something similar to the recently added local checkpointing for
>>>>>>>> RDDs
>>>>>>>>
>>>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>>>
>>>>>>>> A further enhancement would be to add update support to the
>>>>>>>> columnar format (in the immutable copy-on-write sense of course), by
>>>>>>>> maintaining references to unchanged row blocks and only copying and
>>>>>>>> mutating the ones that have changed.
>>>>>>>>
>>>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>>>> that can be efficiently stored internally in a columnar format, rather than
>>>>>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cristian
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Cristian O <cr...@googlemail.com>.
Hi,

While these OSS efforts are interesting, they're for now quite unproven.
Personally would be much more interested in seeing Spark incrementally
moving towards supporting updating DataFrames on various storage
substrates, and first of all locally, perhaps as an extension of cached
DataFrames.

However before we get full blown update support, I would suggest two
enhancements that are fairly straightforward with the current design. If
they make sense please let me know and I'll add them as Jiras:

1. Checkpoint support for DataFrames - as mentioned this can be as simple
as saving to a parquet file or some other format, but would not require
re-reading the file to alter the lineage, and would also prune the logical
plan. Alternatively checkpointing a cached DataFrame can delegate to
checkpointing the underlying RDD but again needs to prune the logical plan.

2. Efficient transformation of cached DataFrames to cached DataFrames - an
efficient copy-on-write mechanism can be used to avoid unpacking
CachedBatches (row groups) into InternalRows when building a cached
DataFrame out of a source cached DataFrame through transformations (like an
outer join) that only affect a small subset of rows. Statistics and
partitioning information can be used to determine which row groups are
affected and which can be copied *by reference* unchanged. This would
effectively allow performing immutable updates of cached DataFrames in
scenarios like Streaming or other iterative use cases like ML.

Thanks,
Cristian



On 16 November 2015 at 08:30, Mark Hamstra <ma...@clearstorydata.com> wrote:

> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>
> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <nick.pentreath@gmail.com
> > wrote:
>
>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>> input/output format support:
>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>
>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> This (updates) is something we are going to think about in the next
>>> release or two.
>>>
>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>> cristian.b.opris@googlemail.com> wrote:
>>>
>>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>>> example.
>>>>
>>>> Basically I want to conceptually do this:
>>>>
>>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 1)).toDF("k", "v")
>>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => (i, 1)).toDF("k", "v")
>>>>
>>>> bigDf.cache()
>>>>
>>>> bigDf.registerTempTable("big")
>>>> deltaDf.registerTempTable("delta")
>>>>
>>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>>
>>>> newBigDf.cache()
>>>> bigDf.unpersist()
>>>>
>>>>
>>>> This is essentially an update of keys "1" and "50000" only, in a
>>>> dataset of 1 million keys.
>>>>
>>>> This can be achieved efficiently if the join would preserve the cached
>>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>>> blocks corresponding to the matching join keys.
>>>>
>>>> Statistics can determine which blocks actually need mutating. Note also
>>>> that shuffling is not required assuming both dataframes are pre-partitioned
>>>> by the same key K.
>>>>
>>>> In SQL this could actually be expressed as an UPDATE statement or for a
>>>> more generalized use as a MERGE UPDATE:
>>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>>
>>>> While this may seem like a very special case optimization, it would
>>>> effectively implement UPDATE support for cached DataFrames, for both
>>>> optimal and non-optimal usage.
>>>>
>>>> I appreciate there's quite a lot here, so thank you for taking the time
>>>> to consider it.
>>>>
>>>> Cristian
>>>>
>>>>
>>>>
>>>> On 12 November 2015 at 15:49, Cristian O <
>>>> cristian.b.opris@googlemail.com> wrote:
>>>>
>>>>> Hi Reynold,
>>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> Parquet may very well be used as the underlying implementation, but
>>>>> this is more than about a particular storage representation.
>>>>>
>>>>> There are a few things here that are inter-related and open different
>>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>>
>>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>>> parquet, just using that as a checkpoint would currently require explicitly
>>>>> reading it back. A proper checkpoint implementation would just save
>>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>>> continue using the same DF, now backed by the checkpoint.
>>>>>
>>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>>> that may arise from unbounded expansion with iterative use-cases, like this
>>>>> one I encountered recently:
>>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>>
>>>>> But really what I'm after here is:
>>>>>
>>>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>>>> keeping a relatively large dataset cached and updating it iteratively from
>>>>> streaming. For example one would like to perform ad-hoc queries on an
>>>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>>>> an increasingly common use case. Note that the dataset may require merging
>>>>> (like adding) or overrriding values by key, so simply appending is not
>>>>> sufficient.
>>>>>
>>>>> This is very similar in concept with updateStateByKey for regular
>>>>> RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at
>>>>> CachedBatch level  (the row blocks for the columnar representation).
>>>>>
>>>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>>>> very inefficient as it requires copying and recaching the entire dataset,
>>>>> and unpersisting the original one. There are also the aforementioned
>>>>> problems with unbounded logical plans (physical plans are fine)
>>>>>
>>>>> These two together, checkpointing and updating cached DataFrames,
>>>>> would give fault-tolerant efficient updating of DataFrames, meaning
>>>>> streaming apps can take advantage of the compact columnar representation
>>>>> and Tungsten optimisations.
>>>>>
>>>>> I'm not quite sure if something like this can be achieved by other
>>>>> means or has been investigated before, hence why I'm looking for feedback
>>>>> here.
>>>>>
>>>>> While one could use external data stores, they would have the added IO
>>>>> penalty, plus most of what's available at the moment is either HDFS
>>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>>> space overhead over columnar formats.
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com> wrote:
>>>>>
>>>>>> Thanks for the email. Can you explain what the difference is between
>>>>>> this and existing formats such as Parquet/ORC?
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>>> cristian.b.opris@googlemail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I was wondering if there's any planned support for local disk
>>>>>>> columnar storage.
>>>>>>>
>>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>>> possibly something similar to the recently added local checkpointing for
>>>>>>> RDDs
>>>>>>>
>>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>>
>>>>>>> A further enhancement would be to add update support to the columnar
>>>>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>>>>> references to unchanged row blocks and only copying and mutating the ones
>>>>>>> that have changed.
>>>>>>>
>>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>>> that can be efficiently stored internally in a columnar format, rather than
>>>>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cristian
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Mark Hamstra <ma...@clearstorydata.com>.
FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB

On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <ni...@gmail.com>
wrote:

> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
> input/output format support:
> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>
> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <rx...@databricks.com> wrote:
>
>> This (updates) is something we are going to think about in the next
>> release or two.
>>
>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>> cristian.b.opris@googlemail.com> wrote:
>>
>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>> example.
>>>
>>> Basically I want to conceptually do this:
>>>
>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 1)).toDF("k", "v")
>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => (i, 1)).toDF("k", "v")
>>>
>>> bigDf.cache()
>>>
>>> bigDf.registerTempTable("big")
>>> deltaDf.registerTempTable("delta")
>>>
>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>
>>> newBigDf.cache()
>>> bigDf.unpersist()
>>>
>>>
>>> This is essentially an update of keys "1" and "50000" only, in a dataset
>>> of 1 million keys.
>>>
>>> This can be achieved efficiently if the join would preserve the cached
>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>> blocks corresponding to the matching join keys.
>>>
>>> Statistics can determine which blocks actually need mutating. Note also
>>> that shuffling is not required assuming both dataframes are pre-partitioned
>>> by the same key K.
>>>
>>> In SQL this could actually be expressed as an UPDATE statement or for a
>>> more generalized use as a MERGE UPDATE:
>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>
>>> While this may seem like a very special case optimization, it would
>>> effectively implement UPDATE support for cached DataFrames, for both
>>> optimal and non-optimal usage.
>>>
>>> I appreciate there's quite a lot here, so thank you for taking the time
>>> to consider it.
>>>
>>> Cristian
>>>
>>>
>>>
>>> On 12 November 2015 at 15:49, Cristian O <
>>> cristian.b.opris@googlemail.com> wrote:
>>>
>>>> Hi Reynold,
>>>>
>>>> Thanks for your reply.
>>>>
>>>> Parquet may very well be used as the underlying implementation, but
>>>> this is more than about a particular storage representation.
>>>>
>>>> There are a few things here that are inter-related and open different
>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>
>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>> parquet, just using that as a checkpoint would currently require explicitly
>>>> reading it back. A proper checkpoint implementation would just save
>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>> continue using the same DF, now backed by the checkpoint.
>>>>
>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>> that may arise from unbounded expansion with iterative use-cases, like this
>>>> one I encountered recently:
>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>
>>>> But really what I'm after here is:
>>>>
>>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>>> keeping a relatively large dataset cached and updating it iteratively from
>>>> streaming. For example one would like to perform ad-hoc queries on an
>>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>>> an increasingly common use case. Note that the dataset may require merging
>>>> (like adding) or overrriding values by key, so simply appending is not
>>>> sufficient.
>>>>
>>>> This is very similar in concept with updateStateByKey for regular RDDs,
>>>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>>>> level  (the row blocks for the columnar representation).
>>>>
>>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>>> very inefficient as it requires copying and recaching the entire dataset,
>>>> and unpersisting the original one. There are also the aforementioned
>>>> problems with unbounded logical plans (physical plans are fine)
>>>>
>>>> These two together, checkpointing and updating cached DataFrames, would
>>>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>>>> apps can take advantage of the compact columnar representation and Tungsten
>>>> optimisations.
>>>>
>>>> I'm not quite sure if something like this can be achieved by other
>>>> means or has been investigated before, hence why I'm looking for feedback
>>>> here.
>>>>
>>>> While one could use external data stores, they would have the added IO
>>>> penalty, plus most of what's available at the moment is either HDFS
>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>> space overhead over columnar formats.
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com> wrote:
>>>>
>>>>> Thanks for the email. Can you explain what the difference is between
>>>>> this and existing formats such as Parquet/ORC?
>>>>>
>>>>>
>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>> cristian.b.opris@googlemail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I was wondering if there's any planned support for local disk
>>>>>> columnar storage.
>>>>>>
>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>> possibly something similar to the recently added local checkpointing for
>>>>>> RDDs
>>>>>>
>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>
>>>>>> A further enhancement would be to add update support to the columnar
>>>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>>>> references to unchanged row blocks and only copying and mutating the ones
>>>>>> that have changed.
>>>>>>
>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>> that can be efficiently stored internally in a columnar format, rather than
>>>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>>>
>>>>>> Thanks,
>>>>>> Cristian
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Nick Pentreath <ni...@gmail.com>.
Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
input/output format support:
https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java

On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <rx...@databricks.com> wrote:

> This (updates) is something we are going to think about in the next
> release or two.
>
> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
> cristian.b.opris@googlemail.com> wrote:
>
>> Sorry, apparently only replied to Reynold, meant to copy the list as
>> well, so I'm self replying and taking the opportunity to illustrate with an
>> example.
>>
>> Basically I want to conceptually do this:
>>
>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 1)).toDF("k", "v")
>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => (i, 1)).toDF("k", "v")
>>
>> bigDf.cache()
>>
>> bigDf.registerTempTable("big")
>> deltaDf.registerTempTable("delta")
>>
>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>
>> newBigDf.cache()
>> bigDf.unpersist()
>>
>>
>> This is essentially an update of keys "1" and "50000" only, in a dataset
>> of 1 million keys.
>>
>> This can be achieved efficiently if the join would preserve the cached
>> blocks that have been unaffected, and only copy and mutate the 2 affected
>> blocks corresponding to the matching join keys.
>>
>> Statistics can determine which blocks actually need mutating. Note also
>> that shuffling is not required assuming both dataframes are pre-partitioned
>> by the same key K.
>>
>> In SQL this could actually be expressed as an UPDATE statement or for a
>> more generalized use as a MERGE UPDATE:
>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>
>> While this may seem like a very special case optimization, it would
>> effectively implement UPDATE support for cached DataFrames, for both
>> optimal and non-optimal usage.
>>
>> I appreciate there's quite a lot here, so thank you for taking the time
>> to consider it.
>>
>> Cristian
>>
>>
>>
>> On 12 November 2015 at 15:49, Cristian O <cristian.b.opris@googlemail.com
>> > wrote:
>>
>>> Hi Reynold,
>>>
>>> Thanks for your reply.
>>>
>>> Parquet may very well be used as the underlying implementation, but this
>>> is more than about a particular storage representation.
>>>
>>> There are a few things here that are inter-related and open different
>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>
>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>> parquet, just using that as a checkpoint would currently require explicitly
>>> reading it back. A proper checkpoint implementation would just save
>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>> continue using the same DF, now backed by the checkpoint.
>>>
>>> It's important to prune the logical plan to avoid all kinds of issues
>>> that may arise from unbounded expansion with iterative use-cases, like this
>>> one I encountered recently:
>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>
>>> But really what I'm after here is:
>>>
>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>> keeping a relatively large dataset cached and updating it iteratively from
>>> streaming. For example one would like to perform ad-hoc queries on an
>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>> an increasingly common use case. Note that the dataset may require merging
>>> (like adding) or overrriding values by key, so simply appending is not
>>> sufficient.
>>>
>>> This is very similar in concept with updateStateByKey for regular RDDs,
>>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>>> level  (the row blocks for the columnar representation).
>>>
>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>> very inefficient as it requires copying and recaching the entire dataset,
>>> and unpersisting the original one. There are also the aforementioned
>>> problems with unbounded logical plans (physical plans are fine)
>>>
>>> These two together, checkpointing and updating cached DataFrames, would
>>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>>> apps can take advantage of the compact columnar representation and Tungsten
>>> optimisations.
>>>
>>> I'm not quite sure if something like this can be achieved by other means
>>> or has been investigated before, hence why I'm looking for feedback here.
>>>
>>> While one could use external data stores, they would have the added IO
>>> penalty, plus most of what's available at the moment is either HDFS
>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>> space overhead over columnar formats.
>>>
>>> Thanks,
>>> Cristian
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>>> Thanks for the email. Can you explain what the difference is between
>>>> this and existing formats such as Parquet/ORC?
>>>>
>>>>
>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>> cristian.b.opris@googlemail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I was wondering if there's any planned support for local disk columnar
>>>>> storage.
>>>>>
>>>>> This could be an extension of the in-memory columnar store, or
>>>>> possibly something similar to the recently added local checkpointing for
>>>>> RDDs
>>>>>
>>>>> This could also have the added benefit of enabling iterative usage for
>>>>> DataFrames by pruning the query plan through local checkpoints.
>>>>>
>>>>> A further enhancement would be to add update support to the columnar
>>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>>> references to unchanged row blocks and only copying and mutating the ones
>>>>> that have changed.
>>>>>
>>>>> A use case here is streaming and merging updates in a large dataset
>>>>> that can be efficiently stored internally in a columnar format, rather than
>>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Reynold Xin <rx...@databricks.com>.
This (updates) is something we are going to think about in the next release
or two.

On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <cristian.b.opris@googlemail.com
> wrote:

> Sorry, apparently only replied to Reynold, meant to copy the list as well,
> so I'm self replying and taking the opportunity to illustrate with an
> example.
>
> Basically I want to conceptually do this:
>
> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, 1)).toDF("k", "v")
> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => (i, 1)).toDF("k", "v")
>
> bigDf.cache()
>
> bigDf.registerTempTable("big")
> deltaDf.registerTempTable("delta")
>
> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>
> newBigDf.cache()
> bigDf.unpersist()
>
>
> This is essentially an update of keys "1" and "50000" only, in a dataset
> of 1 million keys.
>
> This can be achieved efficiently if the join would preserve the cached
> blocks that have been unaffected, and only copy and mutate the 2 affected
> blocks corresponding to the matching join keys.
>
> Statistics can determine which blocks actually need mutating. Note also
> that shuffling is not required assuming both dataframes are pre-partitioned
> by the same key K.
>
> In SQL this could actually be expressed as an UPDATE statement or for a
> more generalized use as a MERGE UPDATE:
> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>
> While this may seem like a very special case optimization, it would
> effectively implement UPDATE support for cached DataFrames, for both
> optimal and non-optimal usage.
>
> I appreciate there's quite a lot here, so thank you for taking the time to
> consider it.
>
> Cristian
>
>
>
> On 12 November 2015 at 15:49, Cristian O <cr...@googlemail.com>
> wrote:
>
>> Hi Reynold,
>>
>> Thanks for your reply.
>>
>> Parquet may very well be used as the underlying implementation, but this
>> is more than about a particular storage representation.
>>
>> There are a few things here that are inter-related and open different
>> possibilities, so it's hard to structure, but I'll give it a try:
>>
>> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
>> just using that as a checkpoint would currently require explicitly reading
>> it back. A proper checkpoint implementation would just save (perhaps
>> asynchronously) and prune the logical plan while allowing to continue using
>> the same DF, now backed by the checkpoint.
>>
>> It's important to prune the logical plan to avoid all kinds of issues
>> that may arise from unbounded expansion with iterative use-cases, like this
>> one I encountered recently:
>> https://issues.apache.org/jira/browse/SPARK-11596
>>
>> But really what I'm after here is:
>>
>> 2. Efficient updating of cached DataFrames - The main use case here is
>> keeping a relatively large dataset cached and updating it iteratively from
>> streaming. For example one would like to perform ad-hoc queries on an
>> incrementally updated, cached DataFrame. I expect this is already becoming
>> an increasingly common use case. Note that the dataset may require merging
>> (like adding) or overrriding values by key, so simply appending is not
>> sufficient.
>>
>> This is very similar in concept with updateStateByKey for regular RDDs,
>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>> level  (the row blocks for the columnar representation).
>>
>> This can be currently simulated with UNION or (OUTER) JOINs however is
>> very inefficient as it requires copying and recaching the entire dataset,
>> and unpersisting the original one. There are also the aforementioned
>> problems with unbounded logical plans (physical plans are fine)
>>
>> These two together, checkpointing and updating cached DataFrames, would
>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>> apps can take advantage of the compact columnar representation and Tungsten
>> optimisations.
>>
>> I'm not quite sure if something like this can be achieved by other means
>> or has been investigated before, hence why I'm looking for feedback here.
>>
>> While one could use external data stores, they would have the added IO
>> penalty, plus most of what's available at the moment is either HDFS
>> (extremely inefficient for updates) or key-value stores that have 5-10x
>> space overhead over columnar formats.
>>
>> Thanks,
>> Cristian
>>
>>
>>
>>
>>
>>
>> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Thanks for the email. Can you explain what the difference is between
>>> this and existing formats such as Parquet/ORC?
>>>
>>>
>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>> cristian.b.opris@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I was wondering if there's any planned support for local disk columnar
>>>> storage.
>>>>
>>>> This could be an extension of the in-memory columnar store, or possibly
>>>> something similar to the recently added local checkpointing for RDDs
>>>>
>>>> This could also have the added benefit of enabling iterative usage for
>>>> DataFrames by pruning the query plan through local checkpoints.
>>>>
>>>> A further enhancement would be to add update support to the columnar
>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>> references to unchanged row blocks and only copying and mutating the ones
>>>> that have changed.
>>>>
>>>> A use case here is streaming and merging updates in a large dataset
>>>> that can be efficiently stored internally in a columnar format, rather than
>>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>
>>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Cristian O <cr...@googlemail.com>.
Sorry, apparently only replied to Reynold, meant to copy the list as well,
so I'm self replying and taking the opportunity to illustrate with an
example.

Basically I want to conceptually do this:

val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i
=> (i, 1)).toDF("k", "v")
val deltaDf = sqlContext.sparkContext.parallelize(Array(1,
50000)).map(i => (i, 1)).toDF("k", "v")

bigDf.cache()

bigDf.registerTempTable("big")
deltaDf.registerTempTable("delta")

val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is
null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")

newBigDf.cache()
bigDf.unpersist()


This is essentially an update of keys "1" and "50000" only, in a dataset of
1 million keys.

This can be achieved efficiently if the join would preserve the cached
blocks that have been unaffected, and only copy and mutate the 2 affected
blocks corresponding to the matching join keys.

Statistics can determine which blocks actually need mutating. Note also
that shuffling is not required assuming both dataframes are pre-partitioned
by the same key K.

In SQL this could actually be expressed as an UPDATE statement or for a
more generalized use as a MERGE UPDATE:
https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx

While this may seem like a very special case optimization, it would
effectively implement UPDATE support for cached DataFrames, for both
optimal and non-optimal usage.

I appreciate there's quite a lot here, so thank you for taking the time to
consider it.

Cristian



On 12 November 2015 at 15:49, Cristian O <cr...@googlemail.com>
wrote:

> Hi Reynold,
>
> Thanks for your reply.
>
> Parquet may very well be used as the underlying implementation, but this
> is more than about a particular storage representation.
>
> There are a few things here that are inter-related and open different
> possibilities, so it's hard to structure, but I'll give it a try:
>
> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
> just using that as a checkpoint would currently require explicitly reading
> it back. A proper checkpoint implementation would just save (perhaps
> asynchronously) and prune the logical plan while allowing to continue using
> the same DF, now backed by the checkpoint.
>
> It's important to prune the logical plan to avoid all kinds of issues that
> may arise from unbounded expansion with iterative use-cases, like this one
> I encountered recently: https://issues.apache.org/jira/browse/SPARK-11596
>
> But really what I'm after here is:
>
> 2. Efficient updating of cached DataFrames - The main use case here is
> keeping a relatively large dataset cached and updating it iteratively from
> streaming. For example one would like to perform ad-hoc queries on an
> incrementally updated, cached DataFrame. I expect this is already becoming
> an increasingly common use case. Note that the dataset may require merging
> (like adding) or overrriding values by key, so simply appending is not
> sufficient.
>
> This is very similar in concept with updateStateByKey for regular RDDs,
> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
> level  (the row blocks for the columnar representation).
>
> This can be currently simulated with UNION or (OUTER) JOINs however is
> very inefficient as it requires copying and recaching the entire dataset,
> and unpersisting the original one. There are also the aforementioned
> problems with unbounded logical plans (physical plans are fine)
>
> These two together, checkpointing and updating cached DataFrames, would
> give fault-tolerant efficient updating of DataFrames, meaning streaming
> apps can take advantage of the compact columnar representation and Tungsten
> optimisations.
>
> I'm not quite sure if something like this can be achieved by other means
> or has been investigated before, hence why I'm looking for feedback here.
>
> While one could use external data stores, they would have the added IO
> penalty, plus most of what's available at the moment is either HDFS
> (extremely inefficient for updates) or key-value stores that have 5-10x
> space overhead over columnar formats.
>
> Thanks,
> Cristian
>
>
>
>
>
>
> On 12 November 2015 at 03:31, Reynold Xin <rx...@databricks.com> wrote:
>
>> Thanks for the email. Can you explain what the difference is between this
>> and existing formats such as Parquet/ORC?
>>
>>
>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>> cristian.b.opris@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I was wondering if there's any planned support for local disk columnar
>>> storage.
>>>
>>> This could be an extension of the in-memory columnar store, or possibly
>>> something similar to the recently added local checkpointing for RDDs
>>>
>>> This could also have the added benefit of enabling iterative usage for
>>> DataFrames by pruning the query plan through local checkpoints.
>>>
>>> A further enhancement would be to add update support to the columnar
>>> format (in the immutable copy-on-write sense of course), by maintaining
>>> references to unchanged row blocks and only copying and mutating the ones
>>> that have changed.
>>>
>>> A use case here is streaming and merging updates in a large dataset that
>>> can be efficiently stored internally in a columnar format, rather than
>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>
>>> Thanks,
>>> Cristian
>>>
>>
>>
>

Re: Support for local disk columnar storage for DataFrames

Posted by Reynold Xin <rx...@databricks.com>.
Thanks for the email. Can you explain what the difference is between this
and existing formats such as Parquet/ORC?


On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <cristian.b.opris@googlemail.com
> wrote:

> Hi,
>
> I was wondering if there's any planned support for local disk columnar
> storage.
>
> This could be an extension of the in-memory columnar store, or possibly
> something similar to the recently added local checkpointing for RDDs
>
> This could also have the added benefit of enabling iterative usage for
> DataFrames by pruning the query plan through local checkpoints.
>
> A further enhancement would be to add update support to the columnar
> format (in the immutable copy-on-write sense of course), by maintaining
> references to unchanged row blocks and only copying and mutating the ones
> that have changed.
>
> A use case here is streaming and merging updates in a large dataset that
> can be efficiently stored internally in a columnar format, rather than
> accessing a more inefficient external  data store like HDFS or Cassandra.
>
> Thanks,
> Cristian
>