You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Anton Okolnychyi <ao...@apple.com.INVALID> on 2019/05/06 11:18:28 UTC

Micro-Batch Streaming

Hi,

I would like to discuss the support for micro-batch streaming in Iceberg.

First of all, do we think micro-batch use-cases are appropriate for Iceberg? What do we consider as "slow-moving data"? Do we want to support batch intervals of 30s? What about intervals of 1m/2m/5m? The latter seems doable as we already have FastAppend and other optimizations in place.

In particular, I am interested in Spark structured streaming. I have a few things I want to discuss, but let's confirm it is appropriate for Iceberg.

Thanks,
Anton

Re: Micro-Batch Streaming

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
> I'm reluctant to do this without an explicit call from the user or in a service. The problem is when to expire snapshots. Iceberg is called regularly to read and write tables. That might seem like a good time to expire snapshots, but it doesn't make sense for either one to have a side effect of physically deleting data files and discarding metadata. That's going beyond user expectations to do destructive tasks. Plus, it changes the guarantees of those operations, where reads should be as fast as possible and there may be guarantees relying on writes not doing additional operations that could cause failures.

Yep, makes sense. It is better to explain the need of expiring snapshots to the user and let them decide.

> For Flink, we're creating a UUID for each checkpoint that writes files, writing that into the snapshot summary, and then checking whether a known snapshot had that ID when the write resumes after a failure. That sounds like what you're suggesting here, but using queryId/epochId as the write ID. Sounds like a good plan to me.

Alright, I created two issues:
- https://github.com/apache/incubator-iceberg/issues/178 <https://github.com/apache/incubator-iceberg/issues/178> (sink)
- https://github.com/apache/incubator-iceberg/issues/179 <https://github.com/apache/incubator-iceberg/issues/179> (source)

Thanks,
Anton


> On 6 May 2019, at 23:30, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> 
> Replies inline.
> 
> On Mon, May 6, 2019 at 3:01 PM Anton Okolnychyi <aokolnychyi@apple.com <ma...@apple.com>> wrote:
> I am also wondering whether it makes sense to have a config that limits the number of snapshot we want to track. This config can be based on the number of snapshots (e.g. keep only 10000 snapshots) or based on time (e.g. keep snapshots for the last 7 days). We can implement both, actually. AFAIK, the expiration of snapshots is manual right now. Would it make sense to control this via config options or do we expect that users do this?
> 
> I'm reluctant to do this without an explicit call from the user or in a service. The problem is when to expire snapshots. Iceberg is called regularly to read and write tables. That might seem like a good time to expire snapshots, but it doesn't make sense for either one to have a side effect of physically deleting data files and discarding metadata. That's going beyond user expectations to do destructive tasks. Plus, it changes the guarantees of those operations, where reads should be as fast as possible and there may be guarantees relying on writes not doing additional operations that could cause failures.
>  
> Spark provides queryId and epochId/batchId to all sinks, which must ensure that all writes are idempotent. Spark might try to commit the same batch multiple times. So, we need to know the latest committed batchId for every query. One option is to store this information in the table metadata. However, this breaks time traveling and rollbacks. We need to have this mapping per snapshot. Snapshot summary seems like a reasonable choice. Would it make sense to do smth similar to “total-records” and “total-files” to keep the latest committed batch id for each query? Any other ideas are welcome.
> 
> For Flink, we're creating a UUID for each checkpoint that writes files, writing that into the snapshot summary, and then checking whether a known snapshot had that ID when the write resumes after a failure. That sounds like what you're suggesting here, but using queryId/epochId as the write ID. Sounds like a good plan to me.
> 
> rb
>  
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Micro-Batch Streaming

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Replies inline.

On Mon, May 6, 2019 at 3:01 PM Anton Okolnychyi <ao...@apple.com>
wrote:

> I am also wondering whether it makes sense to have a config that limits
> the number of snapshot we want to track. This config can be based on the
> number of snapshots (e.g. keep only 10000 snapshots) or based on time (e.g.
> keep snapshots for the last 7 days). We can implement both, actually.
> AFAIK, the expiration of snapshots is manual right now. Would it make sense
> to control this via config options or do we expect that users do this?
>

I'm reluctant to do this without an explicit call from the user or in a
service. The problem is when to expire snapshots. Iceberg is called
regularly to read and write tables. That might seem like a good time to
expire snapshots, but it doesn't make sense for either one to have a side
effect of physically deleting data files and discarding metadata. That's
going beyond user expectations to do destructive tasks. Plus, it changes
the guarantees of those operations, where reads should be as fast as
possible and there may be guarantees relying on writes not doing additional
operations that could cause failures.


> Spark provides queryId and epochId/batchId to all sinks, which must ensure
> that all writes are idempotent. Spark might try to commit the same batch
> multiple times. So, we need to know the latest committed batchId for every
> query. One option is to store this information in the table metadata.
> However, this breaks time traveling and rollbacks. We need to have this
> mapping per snapshot. Snapshot summary seems like a reasonable choice.
> Would it make sense to do smth similar to “total-records” and “total-files”
> to keep the latest committed batch id for each query? Any other ideas are
> welcome.
>

For Flink, we're creating a UUID for each checkpoint that writes files,
writing that into the snapshot summary, and then checking whether a known
snapshot had that ID when the write resumes after a failure. That sounds
like what you're suggesting here, but using queryId/epochId as the write
ID. Sounds like a good plan to me.

rb

-- 
Ryan Blue
Software Engineer
Netflix

Re: Micro-Batch Streaming

Posted by Anton Okolnychyi <ao...@apple.com.INVALID>.
That’s good news, Ryan. Your observations are also aligned with some benchmarks I performed earlier.

I am also wondering whether it makes sense to have a config that limits the number of snapshot we want to track. This config can be based on the number of snapshots (e.g. keep only 10000 snapshots) or based on time (e.g. keep snapshots for the last 7 days). We can implement both, actually. AFAIK, the expiration of snapshots is manual right now. Would it make sense to control this via config options or do we expect that users do this?

Let’s talk about providing a sink for structured streaming.

Spark provides queryId and epochId/batchId to all sinks, which must ensure that all writes are idempotent. Spark might try to commit the same batch multiple times. So, we need to know the latest committed batchId for every query. One option is to store this information in the table metadata. However, this breaks time traveling and rollbacks. We need to have this mapping per snapshot. Snapshot summary seems like a reasonable choice. Would it make sense to do smth similar to “total-records” and “total-files” to keep the latest committed batch id for each query? Any other ideas are welcome.

Thanks,
Anton


> On 6 May 2019, at 20:09, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> 
> We've been building pipelines that write to Iceberg tables from Flink. Right now, we have applications deployed across 3 AWS regions and have themTh committing every 10 minutes. We also have an application that monitors the tables and moves files from remote regions into the region where we run our Hadoop clusters, and one that is automatically merging small files in the background.
> 
> What we see is commits about every 2-3 minutes on average, with some periodic conflicts when writes across regions happen at the same time. The minimum number of seconds between commits is 1 and it isn't uncommon to see commits less than 10 seconds apart. My interpretation of this is that commit retries for appends are reasonably fast -- fast enough to support streaming writes every few minutes apart.
> 
> I think these stats mean that we could definitely support structured streaming use cases. And, we could also use a table's snapshot history to support reading from an Iceberg table as a streaming source.
> 
> On Mon, May 6, 2019 at 4:18 AM Anton Okolnychyi <ao...@apple.com.invalid> wrote:
> Hi,
> 
> I would like to discuss the support for micro-batch streaming in Iceberg.
> 
> First of all, do we think micro-batch use-cases are appropriate for Iceberg? What do we consider as "slow-moving data"? Do we want to support batch intervals of 30s? What about intervals of 1m/2m/5m? The latter seems doable as we already have FastAppend and other optimizations in place.
> 
> In particular, I am interested in Spark structured streaming. I have a few things I want to discuss, but let's confirm it is appropriate for Iceberg.
> 
> Thanks,
> Anton
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix


Re: Micro-Batch Streaming

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
We've been building pipelines that write to Iceberg tables from Flink.
Right now, we have applications deployed across 3 AWS regions and have them
committing every 10 minutes. We also have an application that monitors the
tables and moves files from remote regions into the region where we run our
Hadoop clusters, and one that is automatically merging small files in the
background.

What we see is commits about every 2-3 minutes on average, with some
periodic conflicts when writes across regions happen at the same time. The
minimum number of seconds between commits is 1 and it isn't uncommon to see
commits less than 10 seconds apart. My interpretation of this is that
commit retries for appends are reasonably fast -- fast enough to support
streaming writes every few minutes apart.

I think these stats mean that we could definitely support structured
streaming use cases. And, we could also use a table's snapshot history to
support reading from an Iceberg table as a streaming source.

On Mon, May 6, 2019 at 4:18 AM Anton Okolnychyi
<ao...@apple.com.invalid> wrote:

> Hi,
>
> I would like to discuss the support for micro-batch streaming in Iceberg.
>
> First of all, do we think micro-batch use-cases are appropriate for
> Iceberg? What do we consider as "slow-moving data"? Do we want to support
> batch intervals of 30s? What about intervals of 1m/2m/5m? The latter seems
> doable as we already have FastAppend and other optimizations in place.
>
> In particular, I am interested in Spark structured streaming. I have a few
> things I want to discuss, but let's confirm it is appropriate for Iceberg.
>
> Thanks,
> Anton
>


-- 
Ryan Blue
Software Engineer
Netflix