You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Erik Wright <er...@shopify.com.INVALID> on 2018/12/01 02:23:00 UTC

Re: merge-on-read?

Hi Ryan, Owen,

Just following up on this question. Implemented properly, do you see any
reason that a series of PRs to implement merge-on-read support wouldn't be
welcomed?

Thanks,

Erik

On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wright@shopify.com
wrote:

>
>
> On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
> wrote:
>
>> For Hive's ACID, we started with deltas that had three options per a row:
>> insert, delete, edit. Since that didn't enable predicate push down in the
>> common case where there are large number of inserts, we went to the model
>> of just using inserts and deletes in separate files. Queries that
>> modifying
>> tables delete the old row and insert a new one. That allowed us to get
>> good
>> performance for read, where it is most critical. There are some important
>> optimizations like for a small number of deletes, you can read all of the
>> deletes into memory and close that file.
>>
>
> Presumably predicate pushdown can still be supported if the deltas are
> partitioned similarly to the base dataset? Or is the issue about predicates
> on fields that might change between two versions of a row?
>
> If I understand correctly, we already do what you ended up with: when a
> row is updated in a way that moves it between partitions we record a delete
> for the partition that it was removed from and an insertion in the
> partition it was inserted into.
>
> I personally favour inserts/deletes in separate files because it allows
> the schema of your insert files to be consistent with the dataset schema
> (with respect to nullability).
>
> The delete optimization sounds clever.
>
> .. Owen
>>
>> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
>> .invalid>
>> wrote:
>>
>> > Those are both really neat use cases, but the one I had in mind was what
>> > Ryan mentioned. It's something that Hoodie apparently supports or is
>> > building support for, and it's an important use case for the systems
>> that
>> > my colleagues and I are building.
>> >
>> > There are three scenarios:
>> >
>> >    - An Extract system that is receiving updates/deletes from a source
>> >    system. We wish to capture them as quickly as possible and make them
>> >    available to users without having to restate the affected data files.
>> > The
>> >    update patterns are not anything that can be addressed with
>> > partitioning.
>> >    - A Transform platform that is running a graph of jobs. For some jobs
>> >    that are rebuilt from scratch, we would like to compress the output
>> > without
>> >    losing the history.
>> >    - A Transform / Load system that is building tables on GCS and
>> >    registering them in Hive for querying by Presto. This system is
>> >    incrementally updating views, and while some of those views are
>> >    event-oriented (with most updates clustered in recent history) some
>> of
>> > them
>> >    are not and in those cases there is not partitioning algorithm that
>> will
>> >    prevent us from updating virtually all partitions in every update.
>> >
>> > We have one example of an internal solution but would prefer something
>> less
>> > bespoke. That system works as follows:
>> >
>> >    1. For each dataset, unique key columns are defined.
>> >    2. Datasets are partitioned (not necessarily by anything in the key).
>> >    3. Upserts/deletes are captured in a mutation set.
>> >    4. The mutation set is used to update affected partitions:
>> >       1. Identify the previous/new partition for each upserted/deleted
>> row.
>> >       2. Open the affected partitions, drop all rows matching an
>> >       upserted/deleted key.
>> >       3. Append all upserts.
>> >       4. Write out the result.
>> >    5. We maintain an index (effectively an Iceberg snapshot) that says
>> >    which partitions come from where (we keep the ones that are
>> unaffected
>> > from
>> >    the previous dataset version and add in the updated ones).
>> >
>> > This data is loaded into Presto and our current plan is to update it by
>> > registering a view in Presto that applies recent mutation sets to the
>> > latest merged version on the fly.
>> >
>> > So to build this in Iceberg we would likely need to extend the Table
>> spec
>> > with:
>> >
>> >    - An optional unique key specification, possibly composite, naming
>> one
>> >    or more columns for which there is expected to be at most one row per
>> >    unique value.
>> >    - The ability to indicate in the snapshot that a certain set of
>> >    manifests are "base" data while other manifests are "diffs".
>> >    - The ability in a "diff" manifest to indicate files that contain
>> >    "deleted" keys (or else the ability in a given row to have a special
>> > column
>> >    that indicates that the row is a "delete" and not an "upsert")
>> >    - "diff" manifests would need to be ordered in the snapshot (as
>> multiple
>> >    "diff" manifests could affect a single row and only the latest of
>> those
>> >    takes effect).
>> >
>> > Obviously readers would need to be updated to correctly interpret this
>> > data. And there is all kinds of supporting work that would be required
>> in
>> > order to maintain these (periodically collapsing diffs into the base,
>> > etc.).
>> >
>> > Is this something for which PRs would be accepted, assuming all of the
>> > necessary steps to make sure the direction is compatible with Iceberg's
>> > other use-cases?
>> >
>> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <ow...@gmail.com>
>> > wrote:
>> >
>> > > I’m not sure what use case Erik is looking for, but I’ve had users
>> that
>> > > want to do the equivalent of HBase’s column families. They want some
>> of
>> > the
>> > > columns to be stored separately and the merged together on read. The
>> > > requirements would be that there is a 1:1 mapping between rows in the
>> > > matching files and stripes.
>> > >
>> > > It would look like:
>> > >
>> > > file1.orc: struct<name:string,email:string> file2.orc:
>> > > struct<lastAccess:timestamp>
>> > >
>> > > It would let them leave the stable information and only re-write the
>> > > second column family when the information in the mutable column family
>> > > changes. It would also support use cases where you add data enrichment
>> > > columns after the data has been ingested.
>> > >
>> > > From there it is easy to imagine having a replace operator where
>> file2’s
>> > > version of a column replaces file1’s version.
>> > >
>> > > .. Owen
>> > >
>> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID>
>> > > wrote:
>> > > >
>> > > > What do you mean by merge on read?
>> > > >
>> > > > A few people I've talked to are interested in building delete and
>> > upsert
>> > > > features. Those would create files that track the changes, which
>> would
>> > be
>> > > > merged at read time to apply them. Is that what you mean?
>> > > >
>> > > > rb
>> > > >
>> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
>> > > > <er...@shopify.com.invalid> wrote:
>> > > >
>> > > >> Has any consideration been given to the possibility of eventual
>> > > >> merge-on-read support in the Iceberg table spec?
>> > > >>
>> > > >
>> > > >
>> > > > --
>> > > > Ryan Blue
>> > > > Software Engineer
>> > > > Netflix
>> > >
>> > >
>> >
>>
>
On Nov. 28, 2018 5:25 p.m., "Erik Wright" <er...@shopify.com> wrote:



On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
wrote:

> For Hive's ACID, we started with deltas that had three options per a row:
> insert, delete, edit. Since that didn't enable predicate push down in the
> common case where there are large number of inserts, we went to the model
> of just using inserts and deletes in separate files. Queries that modifying
> tables delete the old row and insert a new one. That allowed us to get good
> performance for read, where it is most critical. There are some important
> optimizations like for a small number of deletes, you can read all of the
> deletes into memory and close that file.
>

Presumably predicate pushdown can still be supported if the deltas are
partitioned similarly to the base dataset? Or is the issue about predicates
on fields that might change between two versions of a row?

If I understand correctly, we already do what you ended up with: when a row
is updated in a way that moves it between partitions we record a delete for
the partition that it was removed from and an insertion in the partition it
was inserted into.

I personally favour inserts/deletes in separate files because it allows the
schema of your insert files to be consistent with the dataset schema (with
respect to nullability).

The delete optimization sounds clever.

.. Owen
>
> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
> .invalid>
> wrote:
>
> > Those are both really neat use cases, but the one I had in mind was what
> > Ryan mentioned. It's something that Hoodie apparently supports or is
> > building support for, and it's an important use case for the systems that
> > my colleagues and I are building.
> >
> > There are three scenarios:
> >
> >    - An Extract system that is receiving updates/deletes from a source
> >    system. We wish to capture them as quickly as possible and make them
> >    available to users without having to restate the affected data files.
> > The
> >    update patterns are not anything that can be addressed with
> > partitioning.
> >    - A Transform platform that is running a graph of jobs. For some jobs
> >    that are rebuilt from scratch, we would like to compress the output
> > without
> >    losing the history.
> >    - A Transform / Load system that is building tables on GCS and
> >    registering them in Hive for querying by Presto. This system is
> >    incrementally updating views, and while some of those views are
> >    event-oriented (with most updates clustered in recent history) some of
> > them
> >    are not and in those cases there is not partitioning algorithm that
> will
> >    prevent us from updating virtually all partitions in every update.
> >
> > We have one example of an internal solution but would prefer something
> less
> > bespoke. That system works as follows:
> >
> >    1. For each dataset, unique key columns are defined.
> >    2. Datasets are partitioned (not necessarily by anything in the key).
> >    3. Upserts/deletes are captured in a mutation set.
> >    4. The mutation set is used to update affected partitions:
> >       1. Identify the previous/new partition for each upserted/deleted
> row.
> >       2. Open the affected partitions, drop all rows matching an
> >       upserted/deleted key.
> >       3. Append all upserts.
> >       4. Write out the result.
> >    5. We maintain an index (effectively an Iceberg snapshot) that says
> >    which partitions come from where (we keep the ones that are unaffected
> > from
> >    the previous dataset version and add in the updated ones).
> >
> > This data is loaded into Presto and our current plan is to update it by
> > registering a view in Presto that applies recent mutation sets to the
> > latest merged version on the fly.
> >
> > So to build this in Iceberg we would likely need to extend the Table spec
> > with:
> >
> >    - An optional unique key specification, possibly composite, naming one
> >    or more columns for which there is expected to be at most one row per
> >    unique value.
> >    - The ability to indicate in the snapshot that a certain set of
> >    manifests are "base" data while other manifests are "diffs".
> >    - The ability in a "diff" manifest to indicate files that contain
> >    "deleted" keys (or else the ability in a given row to have a special
> > column
> >    that indicates that the row is a "delete" and not an "upsert")
> >    - "diff" manifests would need to be ordered in the snapshot (as
> multiple
> >    "diff" manifests could affect a single row and only the latest of
> those
> >    takes effect).
> >
> > Obviously readers would need to be updated to correctly interpret this
> > data. And there is all kinds of supporting work that would be required in
> > order to maintain these (periodically collapsing diffs into the base,
> > etc.).
> >
> > Is this something for which PRs would be accepted, assuming all of the
> > necessary steps to make sure the direction is compatible with Iceberg's
> > other use-cases?
> >
> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <ow...@gmail.com>
> > wrote:
> >
> > > I’m not sure what use case Erik is looking for, but I’ve had users that
> > > want to do the equivalent of HBase’s column families. They want some of
> > the
> > > columns to be stored separately and the merged together on read. The
> > > requirements would be that there is a 1:1 mapping between rows in the
> > > matching files and stripes.
> > >
> > > It would look like:
> > >
> > > file1.orc: struct<name:string,email:string> file2.orc:
> > > struct<lastAccess:timestamp>
> > >
> > > It would let them leave the stable information and only re-write the
> > > second column family when the information in the mutable column family
> > > changes. It would also support use cases where you add data enrichment
> > > columns after the data has been ingested.
> > >
> > > From there it is easy to imagine having a replace operator where
> file2’s
> > > version of a column replaces file1’s version.
> > >
> > > .. Owen
> > >
> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID>
> > > wrote:
> > > >
> > > > What do you mean by merge on read?
> > > >
> > > > A few people I've talked to are interested in building delete and
> > upsert
> > > > features. Those would create files that track the changes, which
> would
> > be
> > > > merged at read time to apply them. Is that what you mean?
> > > >
> > > > rb
> > > >
> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > > > <er...@shopify.com.invalid> wrote:
> > > >
> > > >> Has any consideration been given to the possibility of eventual
> > > >> merge-on-read support in the Iceberg table spec?
> > > >>
> > > >
> > > >
> > > > --
> > > > Ryan Blue
> > > > Software Engineer
> > > > Netflix
> > >
> > >
> >
>

Re: merge-on-read?

Posted by Erik Wright <er...@shopify.com.INVALID>.
Thanks for the feedback Ryan. We're sorting through a few ideas for how to
move forward and this helps in our thinking.

On Fri, Nov 30, 2018 at 10:10 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> I think the community would welcome these contributions. I've talked with a
> couple of companies about this sort of thing already, so there are other
> people that could collaborate on it.
>
> On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <erik.wright@shopify.com
> .invalid>
> wrote:
>
> > Hi Ryan, Owen,
> >
> > Just following up on this question. Implemented properly, do you see any
> > reason that a series of PRs to implement merge-on-read support wouldn't
> be
> > welcomed?
> >
> > Thanks,
> >
> > Erik
> >
> > On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wright@shopify.com
> > wrote:
> >
> > >
> > >
> > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
> > > wrote:
> > >
> > >> For Hive's ACID, we started with deltas that had three options per a
> > row:
> > >> insert, delete, edit. Since that didn't enable predicate push down in
> > the
> > >> common case where there are large number of inserts, we went to the
> > model
> > >> of just using inserts and deletes in separate files. Queries that
> > >> modifying
> > >> tables delete the old row and insert a new one. That allowed us to get
> > >> good
> > >> performance for read, where it is most critical. There are some
> > important
> > >> optimizations like for a small number of deletes, you can read all of
> > the
> > >> deletes into memory and close that file.
> > >>
> > >
> > > Presumably predicate pushdown can still be supported if the deltas are
> > > partitioned similarly to the base dataset? Or is the issue about
> > predicates
> > > on fields that might change between two versions of a row?
> > >
> > > If I understand correctly, we already do what you ended up with: when a
> > > row is updated in a way that moves it between partitions we record a
> > delete
> > > for the partition that it was removed from and an insertion in the
> > > partition it was inserted into.
> > >
> > > I personally favour inserts/deletes in separate files because it allows
> > > the schema of your insert files to be consistent with the dataset
> schema
> > > (with respect to nullability).
> > >
> > > The delete optimization sounds clever.
> > >
> > > .. Owen
> > >>
> > >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
> > >> .invalid>
> > >> wrote:
> > >>
> > >> > Those are both really neat use cases, but the one I had in mind was
> > what
> > >> > Ryan mentioned. It's something that Hoodie apparently supports or is
> > >> > building support for, and it's an important use case for the systems
> > >> that
> > >> > my colleagues and I are building.
> > >> >
> > >> > There are three scenarios:
> > >> >
> > >> >    - An Extract system that is receiving updates/deletes from a
> source
> > >> >    system. We wish to capture them as quickly as possible and make
> > them
> > >> >    available to users without having to restate the affected data
> > files.
> > >> > The
> > >> >    update patterns are not anything that can be addressed with
> > >> > partitioning.
> > >> >    - A Transform platform that is running a graph of jobs. For some
> > jobs
> > >> >    that are rebuilt from scratch, we would like to compress the
> output
> > >> > without
> > >> >    losing the history.
> > >> >    - A Transform / Load system that is building tables on GCS and
> > >> >    registering them in Hive for querying by Presto. This system is
> > >> >    incrementally updating views, and while some of those views are
> > >> >    event-oriented (with most updates clustered in recent history)
> some
> > >> of
> > >> > them
> > >> >    are not and in those cases there is not partitioning algorithm
> that
> > >> will
> > >> >    prevent us from updating virtually all partitions in every
> update.
> > >> >
> > >> > We have one example of an internal solution but would prefer
> something
> > >> less
> > >> > bespoke. That system works as follows:
> > >> >
> > >> >    1. For each dataset, unique key columns are defined.
> > >> >    2. Datasets are partitioned (not necessarily by anything in the
> > key).
> > >> >    3. Upserts/deletes are captured in a mutation set.
> > >> >    4. The mutation set is used to update affected partitions:
> > >> >       1. Identify the previous/new partition for each
> upserted/deleted
> > >> row.
> > >> >       2. Open the affected partitions, drop all rows matching an
> > >> >       upserted/deleted key.
> > >> >       3. Append all upserts.
> > >> >       4. Write out the result.
> > >> >    5. We maintain an index (effectively an Iceberg snapshot) that
> says
> > >> >    which partitions come from where (we keep the ones that are
> > >> unaffected
> > >> > from
> > >> >    the previous dataset version and add in the updated ones).
> > >> >
> > >> > This data is loaded into Presto and our current plan is to update it
> > by
> > >> > registering a view in Presto that applies recent mutation sets to
> the
> > >> > latest merged version on the fly.
> > >> >
> > >> > So to build this in Iceberg we would likely need to extend the Table
> > >> spec
> > >> > with:
> > >> >
> > >> >    - An optional unique key specification, possibly composite,
> naming
> > >> one
> > >> >    or more columns for which there is expected to be at most one row
> > per
> > >> >    unique value.
> > >> >    - The ability to indicate in the snapshot that a certain set of
> > >> >    manifests are "base" data while other manifests are "diffs".
> > >> >    - The ability in a "diff" manifest to indicate files that contain
> > >> >    "deleted" keys (or else the ability in a given row to have a
> > special
> > >> > column
> > >> >    that indicates that the row is a "delete" and not an "upsert")
> > >> >    - "diff" manifests would need to be ordered in the snapshot (as
> > >> multiple
> > >> >    "diff" manifests could affect a single row and only the latest of
> > >> those
> > >> >    takes effect).
> > >> >
> > >> > Obviously readers would need to be updated to correctly interpret
> this
> > >> > data. And there is all kinds of supporting work that would be
> required
> > >> in
> > >> > order to maintain these (periodically collapsing diffs into the
> base,
> > >> > etc.).
> > >> >
> > >> > Is this something for which PRs would be accepted, assuming all of
> the
> > >> > necessary steps to make sure the direction is compatible with
> > Iceberg's
> > >> > other use-cases?
> > >> >
> > >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <
> owen.omalley@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> > > I’m not sure what use case Erik is looking for, but I’ve had users
> > >> that
> > >> > > want to do the equivalent of HBase’s column families. They want
> some
> > >> of
> > >> > the
> > >> > > columns to be stored separately and the merged together on read.
> The
> > >> > > requirements would be that there is a 1:1 mapping between rows in
> > the
> > >> > > matching files and stripes.
> > >> > >
> > >> > > It would look like:
> > >> > >
> > >> > > file1.orc: struct<name:string,email:string> file2.orc:
> > >> > > struct<lastAccess:timestamp>
> > >> > >
> > >> > > It would let them leave the stable information and only re-write
> the
> > >> > > second column family when the information in the mutable column
> > family
> > >> > > changes. It would also support use cases where you add data
> > enrichment
> > >> > > columns after the data has been ingested.
> > >> > >
> > >> > > From there it is easy to imagine having a replace operator where
> > >> file2’s
> > >> > > version of a column replaces file1’s version.
> > >> > >
> > >> > > .. Owen
> > >> > >
> > >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue
> <rblue@netflix.com.INVALID
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > What do you mean by merge on read?
> > >> > > >
> > >> > > > A few people I've talked to are interested in building delete
> and
> > >> > upsert
> > >> > > > features. Those would create files that track the changes, which
> > >> would
> > >> > be
> > >> > > > merged at read time to apply them. Is that what you mean?
> > >> > > >
> > >> > > > rb
> > >> > > >
> > >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > >> > > > <er...@shopify.com.invalid> wrote:
> > >> > > >
> > >> > > >> Has any consideration been given to the possibility of eventual
> > >> > > >> merge-on-read support in the Iceberg table spec?
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > > > --
> > >> > > > Ryan Blue
> > >> > > > Software Engineer
> > >> > > > Netflix
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> > On Nov. 28, 2018 5:25 p.m., "Erik Wright" <er...@shopify.com>
> wrote:
> >
> >
> >
> > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
> > wrote:
> >
> > > For Hive's ACID, we started with deltas that had three options per a
> row:
> > > insert, delete, edit. Since that didn't enable predicate push down in
> the
> > > common case where there are large number of inserts, we went to the
> model
> > > of just using inserts and deletes in separate files. Queries that
> > modifying
> > > tables delete the old row and insert a new one. That allowed us to get
> > good
> > > performance for read, where it is most critical. There are some
> important
> > > optimizations like for a small number of deletes, you can read all of
> the
> > > deletes into memory and close that file.
> > >
> >
> > Presumably predicate pushdown can still be supported if the deltas are
> > partitioned similarly to the base dataset? Or is the issue about
> predicates
> > on fields that might change between two versions of a row?
> >
> > If I understand correctly, we already do what you ended up with: when a
> row
> > is updated in a way that moves it between partitions we record a delete
> for
> > the partition that it was removed from and an insertion in the partition
> it
> > was inserted into.
> >
> > I personally favour inserts/deletes in separate files because it allows
> the
> > schema of your insert files to be consistent with the dataset schema
> (with
> > respect to nullability).
> >
> > The delete optimization sounds clever.
> >
> > .. Owen
> > >
> > > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
> > > .invalid>
> > > wrote:
> > >
> > > > Those are both really neat use cases, but the one I had in mind was
> > what
> > > > Ryan mentioned. It's something that Hoodie apparently supports or is
> > > > building support for, and it's an important use case for the systems
> > that
> > > > my colleagues and I are building.
> > > >
> > > > There are three scenarios:
> > > >
> > > >    - An Extract system that is receiving updates/deletes from a
> source
> > > >    system. We wish to capture them as quickly as possible and make
> them
> > > >    available to users without having to restate the affected data
> > files.
> > > > The
> > > >    update patterns are not anything that can be addressed with
> > > > partitioning.
> > > >    - A Transform platform that is running a graph of jobs. For some
> > jobs
> > > >    that are rebuilt from scratch, we would like to compress the
> output
> > > > without
> > > >    losing the history.
> > > >    - A Transform / Load system that is building tables on GCS and
> > > >    registering them in Hive for querying by Presto. This system is
> > > >    incrementally updating views, and while some of those views are
> > > >    event-oriented (with most updates clustered in recent history)
> some
> > of
> > > > them
> > > >    are not and in those cases there is not partitioning algorithm
> that
> > > will
> > > >    prevent us from updating virtually all partitions in every update.
> > > >
> > > > We have one example of an internal solution but would prefer
> something
> > > less
> > > > bespoke. That system works as follows:
> > > >
> > > >    1. For each dataset, unique key columns are defined.
> > > >    2. Datasets are partitioned (not necessarily by anything in the
> > key).
> > > >    3. Upserts/deletes are captured in a mutation set.
> > > >    4. The mutation set is used to update affected partitions:
> > > >       1. Identify the previous/new partition for each
> upserted/deleted
> > > row.
> > > >       2. Open the affected partitions, drop all rows matching an
> > > >       upserted/deleted key.
> > > >       3. Append all upserts.
> > > >       4. Write out the result.
> > > >    5. We maintain an index (effectively an Iceberg snapshot) that
> says
> > > >    which partitions come from where (we keep the ones that are
> > unaffected
> > > > from
> > > >    the previous dataset version and add in the updated ones).
> > > >
> > > > This data is loaded into Presto and our current plan is to update it
> by
> > > > registering a view in Presto that applies recent mutation sets to the
> > > > latest merged version on the fly.
> > > >
> > > > So to build this in Iceberg we would likely need to extend the Table
> > spec
> > > > with:
> > > >
> > > >    - An optional unique key specification, possibly composite, naming
> > one
> > > >    or more columns for which there is expected to be at most one row
> > per
> > > >    unique value.
> > > >    - The ability to indicate in the snapshot that a certain set of
> > > >    manifests are "base" data while other manifests are "diffs".
> > > >    - The ability in a "diff" manifest to indicate files that contain
> > > >    "deleted" keys (or else the ability in a given row to have a
> special
> > > > column
> > > >    that indicates that the row is a "delete" and not an "upsert")
> > > >    - "diff" manifests would need to be ordered in the snapshot (as
> > > multiple
> > > >    "diff" manifests could affect a single row and only the latest of
> > > those
> > > >    takes effect).
> > > >
> > > > Obviously readers would need to be updated to correctly interpret
> this
> > > > data. And there is all kinds of supporting work that would be
> required
> > in
> > > > order to maintain these (periodically collapsing diffs into the base,
> > > > etc.).
> > > >
> > > > Is this something for which PRs would be accepted, assuming all of
> the
> > > > necessary steps to make sure the direction is compatible with
> Iceberg's
> > > > other use-cases?
> > > >
> > > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <
> owen.omalley@gmail.com>
> > > > wrote:
> > > >
> > > > > I’m not sure what use case Erik is looking for, but I’ve had users
> > that
> > > > > want to do the equivalent of HBase’s column families. They want
> some
> > of
> > > > the
> > > > > columns to be stored separately and the merged together on read.
> The
> > > > > requirements would be that there is a 1:1 mapping between rows in
> the
> > > > > matching files and stripes.
> > > > >
> > > > > It would look like:
> > > > >
> > > > > file1.orc: struct<name:string,email:string> file2.orc:
> > > > > struct<lastAccess:timestamp>
> > > > >
> > > > > It would let them leave the stable information and only re-write
> the
> > > > > second column family when the information in the mutable column
> > family
> > > > > changes. It would also support use cases where you add data
> > enrichment
> > > > > columns after the data has been ingested.
> > > > >
> > > > > From there it is easy to imagine having a replace operator where
> > > file2’s
> > > > > version of a column replaces file1’s version.
> > > > >
> > > > > .. Owen
> > > > >
> > > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rblue@netflix.com.INVALID
> >
> > > > > wrote:
> > > > > >
> > > > > > What do you mean by merge on read?
> > > > > >
> > > > > > A few people I've talked to are interested in building delete and
> > > > upsert
> > > > > > features. Those would create files that track the changes, which
> > > would
> > > > be
> > > > > > merged at read time to apply them. Is that what you mean?
> > > > > >
> > > > > > rb
> > > > > >
> > > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > > > > > <er...@shopify.com.invalid> wrote:
> > > > > >
> > > > > >> Has any consideration been given to the possibility of eventual
> > > > > >> merge-on-read support in the Iceberg table spec?
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Ryan Blue
> > > > > > Software Engineer
> > > > > > Netflix
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: merge-on-read?

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I think the community would welcome these contributions. I've talked with a
couple of companies about this sort of thing already, so there are other
people that could collaborate on it.

On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <er...@shopify.com.invalid>
wrote:

> Hi Ryan, Owen,
>
> Just following up on this question. Implemented properly, do you see any
> reason that a series of PRs to implement merge-on-read support wouldn't be
> welcomed?
>
> Thanks,
>
> Erik
>
> On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wright@shopify.com
> wrote:
>
> >
> >
> > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
> > wrote:
> >
> >> For Hive's ACID, we started with deltas that had three options per a
> row:
> >> insert, delete, edit. Since that didn't enable predicate push down in
> the
> >> common case where there are large number of inserts, we went to the
> model
> >> of just using inserts and deletes in separate files. Queries that
> >> modifying
> >> tables delete the old row and insert a new one. That allowed us to get
> >> good
> >> performance for read, where it is most critical. There are some
> important
> >> optimizations like for a small number of deletes, you can read all of
> the
> >> deletes into memory and close that file.
> >>
> >
> > Presumably predicate pushdown can still be supported if the deltas are
> > partitioned similarly to the base dataset? Or is the issue about
> predicates
> > on fields that might change between two versions of a row?
> >
> > If I understand correctly, we already do what you ended up with: when a
> > row is updated in a way that moves it between partitions we record a
> delete
> > for the partition that it was removed from and an insertion in the
> > partition it was inserted into.
> >
> > I personally favour inserts/deletes in separate files because it allows
> > the schema of your insert files to be consistent with the dataset schema
> > (with respect to nullability).
> >
> > The delete optimization sounds clever.
> >
> > .. Owen
> >>
> >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
> >> .invalid>
> >> wrote:
> >>
> >> > Those are both really neat use cases, but the one I had in mind was
> what
> >> > Ryan mentioned. It's something that Hoodie apparently supports or is
> >> > building support for, and it's an important use case for the systems
> >> that
> >> > my colleagues and I are building.
> >> >
> >> > There are three scenarios:
> >> >
> >> >    - An Extract system that is receiving updates/deletes from a source
> >> >    system. We wish to capture them as quickly as possible and make
> them
> >> >    available to users without having to restate the affected data
> files.
> >> > The
> >> >    update patterns are not anything that can be addressed with
> >> > partitioning.
> >> >    - A Transform platform that is running a graph of jobs. For some
> jobs
> >> >    that are rebuilt from scratch, we would like to compress the output
> >> > without
> >> >    losing the history.
> >> >    - A Transform / Load system that is building tables on GCS and
> >> >    registering them in Hive for querying by Presto. This system is
> >> >    incrementally updating views, and while some of those views are
> >> >    event-oriented (with most updates clustered in recent history) some
> >> of
> >> > them
> >> >    are not and in those cases there is not partitioning algorithm that
> >> will
> >> >    prevent us from updating virtually all partitions in every update.
> >> >
> >> > We have one example of an internal solution but would prefer something
> >> less
> >> > bespoke. That system works as follows:
> >> >
> >> >    1. For each dataset, unique key columns are defined.
> >> >    2. Datasets are partitioned (not necessarily by anything in the
> key).
> >> >    3. Upserts/deletes are captured in a mutation set.
> >> >    4. The mutation set is used to update affected partitions:
> >> >       1. Identify the previous/new partition for each upserted/deleted
> >> row.
> >> >       2. Open the affected partitions, drop all rows matching an
> >> >       upserted/deleted key.
> >> >       3. Append all upserts.
> >> >       4. Write out the result.
> >> >    5. We maintain an index (effectively an Iceberg snapshot) that says
> >> >    which partitions come from where (we keep the ones that are
> >> unaffected
> >> > from
> >> >    the previous dataset version and add in the updated ones).
> >> >
> >> > This data is loaded into Presto and our current plan is to update it
> by
> >> > registering a view in Presto that applies recent mutation sets to the
> >> > latest merged version on the fly.
> >> >
> >> > So to build this in Iceberg we would likely need to extend the Table
> >> spec
> >> > with:
> >> >
> >> >    - An optional unique key specification, possibly composite, naming
> >> one
> >> >    or more columns for which there is expected to be at most one row
> per
> >> >    unique value.
> >> >    - The ability to indicate in the snapshot that a certain set of
> >> >    manifests are "base" data while other manifests are "diffs".
> >> >    - The ability in a "diff" manifest to indicate files that contain
> >> >    "deleted" keys (or else the ability in a given row to have a
> special
> >> > column
> >> >    that indicates that the row is a "delete" and not an "upsert")
> >> >    - "diff" manifests would need to be ordered in the snapshot (as
> >> multiple
> >> >    "diff" manifests could affect a single row and only the latest of
> >> those
> >> >    takes effect).
> >> >
> >> > Obviously readers would need to be updated to correctly interpret this
> >> > data. And there is all kinds of supporting work that would be required
> >> in
> >> > order to maintain these (periodically collapsing diffs into the base,
> >> > etc.).
> >> >
> >> > Is this something for which PRs would be accepted, assuming all of the
> >> > necessary steps to make sure the direction is compatible with
> Iceberg's
> >> > other use-cases?
> >> >
> >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <owen.omalley@gmail.com
> >
> >> > wrote:
> >> >
> >> > > I’m not sure what use case Erik is looking for, but I’ve had users
> >> that
> >> > > want to do the equivalent of HBase’s column families. They want some
> >> of
> >> > the
> >> > > columns to be stored separately and the merged together on read. The
> >> > > requirements would be that there is a 1:1 mapping between rows in
> the
> >> > > matching files and stripes.
> >> > >
> >> > > It would look like:
> >> > >
> >> > > file1.orc: struct<name:string,email:string> file2.orc:
> >> > > struct<lastAccess:timestamp>
> >> > >
> >> > > It would let them leave the stable information and only re-write the
> >> > > second column family when the information in the mutable column
> family
> >> > > changes. It would also support use cases where you add data
> enrichment
> >> > > columns after the data has been ingested.
> >> > >
> >> > > From there it is easy to imagine having a replace operator where
> >> file2’s
> >> > > version of a column replaces file1’s version.
> >> > >
> >> > > .. Owen
> >> > >
> >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rblue@netflix.com.INVALID
> >
> >> > > wrote:
> >> > > >
> >> > > > What do you mean by merge on read?
> >> > > >
> >> > > > A few people I've talked to are interested in building delete and
> >> > upsert
> >> > > > features. Those would create files that track the changes, which
> >> would
> >> > be
> >> > > > merged at read time to apply them. Is that what you mean?
> >> > > >
> >> > > > rb
> >> > > >
> >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> >> > > > <er...@shopify.com.invalid> wrote:
> >> > > >
> >> > > >> Has any consideration been given to the possibility of eventual
> >> > > >> merge-on-read support in the Iceberg table spec?
> >> > > >>
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Ryan Blue
> >> > > > Software Engineer
> >> > > > Netflix
> >> > >
> >> > >
> >> >
> >>
> >
> On Nov. 28, 2018 5:25 p.m., "Erik Wright" <er...@shopify.com> wrote:
>
>
>
> On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <ow...@gmail.com>
> wrote:
>
> > For Hive's ACID, we started with deltas that had three options per a row:
> > insert, delete, edit. Since that didn't enable predicate push down in the
> > common case where there are large number of inserts, we went to the model
> > of just using inserts and deletes in separate files. Queries that
> modifying
> > tables delete the old row and insert a new one. That allowed us to get
> good
> > performance for read, where it is most critical. There are some important
> > optimizations like for a small number of deletes, you can read all of the
> > deletes into memory and close that file.
> >
>
> Presumably predicate pushdown can still be supported if the deltas are
> partitioned similarly to the base dataset? Or is the issue about predicates
> on fields that might change between two versions of a row?
>
> If I understand correctly, we already do what you ended up with: when a row
> is updated in a way that moves it between partitions we record a delete for
> the partition that it was removed from and an insertion in the partition it
> was inserted into.
>
> I personally favour inserts/deletes in separate files because it allows the
> schema of your insert files to be consistent with the dataset schema (with
> respect to nullability).
>
> The delete optimization sounds clever.
>
> .. Owen
> >
> > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wright@shopify.com
> > .invalid>
> > wrote:
> >
> > > Those are both really neat use cases, but the one I had in mind was
> what
> > > Ryan mentioned. It's something that Hoodie apparently supports or is
> > > building support for, and it's an important use case for the systems
> that
> > > my colleagues and I are building.
> > >
> > > There are three scenarios:
> > >
> > >    - An Extract system that is receiving updates/deletes from a source
> > >    system. We wish to capture them as quickly as possible and make them
> > >    available to users without having to restate the affected data
> files.
> > > The
> > >    update patterns are not anything that can be addressed with
> > > partitioning.
> > >    - A Transform platform that is running a graph of jobs. For some
> jobs
> > >    that are rebuilt from scratch, we would like to compress the output
> > > without
> > >    losing the history.
> > >    - A Transform / Load system that is building tables on GCS and
> > >    registering them in Hive for querying by Presto. This system is
> > >    incrementally updating views, and while some of those views are
> > >    event-oriented (with most updates clustered in recent history) some
> of
> > > them
> > >    are not and in those cases there is not partitioning algorithm that
> > will
> > >    prevent us from updating virtually all partitions in every update.
> > >
> > > We have one example of an internal solution but would prefer something
> > less
> > > bespoke. That system works as follows:
> > >
> > >    1. For each dataset, unique key columns are defined.
> > >    2. Datasets are partitioned (not necessarily by anything in the
> key).
> > >    3. Upserts/deletes are captured in a mutation set.
> > >    4. The mutation set is used to update affected partitions:
> > >       1. Identify the previous/new partition for each upserted/deleted
> > row.
> > >       2. Open the affected partitions, drop all rows matching an
> > >       upserted/deleted key.
> > >       3. Append all upserts.
> > >       4. Write out the result.
> > >    5. We maintain an index (effectively an Iceberg snapshot) that says
> > >    which partitions come from where (we keep the ones that are
> unaffected
> > > from
> > >    the previous dataset version and add in the updated ones).
> > >
> > > This data is loaded into Presto and our current plan is to update it by
> > > registering a view in Presto that applies recent mutation sets to the
> > > latest merged version on the fly.
> > >
> > > So to build this in Iceberg we would likely need to extend the Table
> spec
> > > with:
> > >
> > >    - An optional unique key specification, possibly composite, naming
> one
> > >    or more columns for which there is expected to be at most one row
> per
> > >    unique value.
> > >    - The ability to indicate in the snapshot that a certain set of
> > >    manifests are "base" data while other manifests are "diffs".
> > >    - The ability in a "diff" manifest to indicate files that contain
> > >    "deleted" keys (or else the ability in a given row to have a special
> > > column
> > >    that indicates that the row is a "delete" and not an "upsert")
> > >    - "diff" manifests would need to be ordered in the snapshot (as
> > multiple
> > >    "diff" manifests could affect a single row and only the latest of
> > those
> > >    takes effect).
> > >
> > > Obviously readers would need to be updated to correctly interpret this
> > > data. And there is all kinds of supporting work that would be required
> in
> > > order to maintain these (periodically collapsing diffs into the base,
> > > etc.).
> > >
> > > Is this something for which PRs would be accepted, assuming all of the
> > > necessary steps to make sure the direction is compatible with Iceberg's
> > > other use-cases?
> > >
> > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <ow...@gmail.com>
> > > wrote:
> > >
> > > > I’m not sure what use case Erik is looking for, but I’ve had users
> that
> > > > want to do the equivalent of HBase’s column families. They want some
> of
> > > the
> > > > columns to be stored separately and the merged together on read. The
> > > > requirements would be that there is a 1:1 mapping between rows in the
> > > > matching files and stripes.
> > > >
> > > > It would look like:
> > > >
> > > > file1.orc: struct<name:string,email:string> file2.orc:
> > > > struct<lastAccess:timestamp>
> > > >
> > > > It would let them leave the stable information and only re-write the
> > > > second column family when the information in the mutable column
> family
> > > > changes. It would also support use cases where you add data
> enrichment
> > > > columns after the data has been ingested.
> > > >
> > > > From there it is easy to imagine having a replace operator where
> > file2’s
> > > > version of a column replaces file1’s version.
> > > >
> > > > .. Owen
> > > >
> > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID>
> > > > wrote:
> > > > >
> > > > > What do you mean by merge on read?
> > > > >
> > > > > A few people I've talked to are interested in building delete and
> > > upsert
> > > > > features. Those would create files that track the changes, which
> > would
> > > be
> > > > > merged at read time to apply them. Is that what you mean?
> > > > >
> > > > > rb
> > > > >
> > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > > > > <er...@shopify.com.invalid> wrote:
> > > > >
> > > > >> Has any consideration been given to the possibility of eventual
> > > > >> merge-on-read support in the Iceberg table spec?
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Ryan Blue
> > > > > Software Engineer
> > > > > Netflix
> > > >
> > > >
> > >
> >
>


-- 
Ryan Blue
Software Engineer
Netflix