You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Gautam <ga...@gmail.com> on 2020/09/08 16:06:24 UTC

Timestamp Based Incremental Reading in Iceberg ...

Hello Devs,
                   We are looking into adding workflows that read data
incrementally based on commit time. The ability to read deltas between
start / end commit timestamps on a table and ability to resume reading from
last read end timestamp. In that regard, we need the timestamps to be
linear in the current active snapshot history (newer versions always have
higher timestamps). Although Iceberg commit flow ensures the versions are
newer, there isn't a check to ensure timestamps are linear.

Example flow, if two clients (clientA and clientB), whose time-clocks are
slightly off (say by a couple seconds), are committing frequently, clientB
might get to commit after clientA even if it's new snapshot timestamps is
out of order. I might be wrong but I haven't found a check in
HadoopTableOperations.commit() to ensure this above case does not happen.

On the other hand, restricting commits due to out-of-order timestamps can
hurt commit throughput so I can see why this isn't something Iceberg might
want to enforce based on System.currentTimeMillis(). Although if clients
had a way to define their own globally synchronized timestamps (using
external service or some monotonically increasing UUID) then iceberg could
allow an API to set that on the snapshot or use that instead of
System.currentTimeMillis(). Iceberg exposes something similar using
Sequence numbers in v2 format to track Deletes and Appends.

Is this a concern others have? If so how are folks handling this today or
are they not exposing such a feature at all due to the inherent distributed
timing problem? Would like to hear how others are thinking/going about
this. Thoughts?

Cheers,

-Gautam.

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
Newby here, but if I understand correctly, the client knows the previous
snapshot and the corresponding timestamp. It could be the responsibility of
the client to generate a new timestamp which is higher or equal than the
previous one. There might be checks implemented on commit to prevent
smaller timestamps. If the timestamp generated by the clock af the client
would be smaller than the previous then it should use the previous one.

I see one issue with the algorithm above:
- If someone uses a wrong, high timestamp, then all of the following
timestamps will be equal to this high value. Would this be an issue in your
case? Shall we just use previous+1 (like epoch millisecond) instead of the
previous in case of a wrong timestamp, or then the timestamp will become
too artificial?

Any other flaws with the algorithm above?

Thanks, Peter

OpenInx <op...@gmail.com> ezt írta (időpont: 2020. szept. 9., Sze 4:55):

> I agree that  it's helpful to allow users to read the incremental delta
> based timestamp,  as Jingsong said timestamp is more friendly.
>
> My question is how to implement this ?
>
>  If just attach the client's timestamp to the iceberg table when
> committing,  then different clients may have different timestamp values
> because of the skewing. In theory, these time values are not strictly
> comparable, and can only be compared within the margin of error.
>
>
> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> +1 for timestamps are linear, in implementation, maybe the writer only
>> needs to look at the previous snapshot timestamp.
>>
>> We're trying to think of iceberg as a message queue, Let's take the
>> popular queue Kafka as an example,
>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
>> timestamp:
>> - offset: It is used for incremental read, such as the state of a
>> checkpoint in a computing system.
>> - timestamp: It is explicitly specified by the user to specify the scope
>> of consumption. As start_timestamp of reading. Timestamp is a better user
>> aware interface. But offset/snapshotId is not human readable and friendly.
>>
>> So there are scenarios where timestamp is used for incremental read.
>>
>> Best,
>> Jingsong
>>
>>
>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>
>>>
>>> We are using incremental read for iceberg tables which gets quite few
>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>> snapshot ids and track state of last read snapshot Id.
>>> We are using timestamp as fallback when the state is incorrect, but as
>>> you mentioned if timestamps are linear then it works as expected.
>>> We also found that incremental reader might be slow when dealing with >
>>> 2k snapshots in range. we are currently testing a manifest based
>>> incremental reader which looks at manifest entries instead of scanning
>>> snapshot history and accessing each snapshot.
>>>
>>> Is there any reason you can't use snapshot based incremental read?
>>>
>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Hello Devs,
>>>>                    We are looking into adding workflows that read data
>>>> incrementally based on commit time. The ability to read deltas between
>>>> start / end commit timestamps on a table and ability to resume reading from
>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>> linear in the current active snapshot history (newer versions always have
>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>
>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>> clientB might get to commit after clientA even if it's new snapshot
>>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>
>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>> clients had a way to define their own globally synchronized timestamps
>>>> (using external service or some monotonically increasing UUID) then iceberg
>>>> could allow an API to set that on the snapshot or use that instead of
>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>>
>>>> Is this a concern others have? If so how are folks handling this today
>>>> or are they not exposing such a feature at all due to the inherent
>>>> distributed timing problem? Would like to hear how others are
>>>> thinking/going about this. Thoughts?
>>>>
>>>> Cheers,
>>>>
>>>> -Gautam.
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
> you never know if that's the same order in which the writers will commit
(when you have multiple writers in the system)

That's exactly the problem with trying to rely on timestamps. Even if you
can coordinate the timestamps themselves, commits could still be out of
order because of differences in how long it takes to write the final
metadata. I think the right approach for incremental consumption is to take
the current snapshot and traverse back through history until you find the
last snapshot that you processed. That way, you know that you have the full
history.

The transaction ID idea is more for committing data exactly-once or for
coordinating with other systems. You can put an external identifier in
snapshot metadata and use that to check whether a particular commit has
already succeeded. You can also use it to track freshness. We are building
materialized views that use the snapshot IDs of other tables to track how
fresh the materialized view is.

On Thu, Sep 10, 2020 at 3:03 PM Ashish Mehta <me...@gmail.com>
wrote:

> > This is better handled by attaching a global transaction-id (e.g. UUID
> that is monotonically increasing) to the snapshot metadata (iceberg allows
> adding this to the summary)
> I believe even if the client can provide metadata for a snapshot during
> commit operation, you never know if that's the same order in which the
> writers will commit (when you have multiple writers in the system).
>
> -Ashish
>
> On Thu, Sep 10, 2020 at 12:18 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> I should also add one more thing. The PR I linked to above is a good way
>> to introduce a clock, but it was pointed out in the sync that even if we
>> had a service that provided synchronized timestamps, there is no guarantee
>> that there isn't a race condition between committers getting timestamps and
>> then committing. So we would still have an out-of-order problem. It is best
>> not to rely on timestamps other than for inspecting tables to get a rough
>> idea of when a node committed.
>>
>> On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Thanks, Gautam! I think that's a good summary of the discussion.
>>>
>>> On Thu, Sep 10, 2020 at 11:56 AM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Wanted to circle back on this thread. Linear timestamps was discussed
>>>> during the sync and the conclusion was that timestamp based incremental
>>>> reading is generally discouraged as that introduces correctness issues.
>>>> Even if a custom clock is available keeping timestamps atomic and
>>>> monotonically increasing is going to be a problem for applications.
>>>> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
>>>> potential issues e.g. a client committing an erroneous timestamp, that is
>>>> way in the future, would block all other clients from committing.
>>>>
>>>> This is better handled by attaching a global transaction-id (e.g. UUID
>>>> that is monotonically increasing) to the snapshot metadata (iceberg allows
>>>> adding this to the summary). The incremental read application can then use
>>>> the transaction-id as a key to the exact from/to snapshot-id to do
>>>> incremental reading.
>>>>
>>>> Hope I covered the points raised.
>>>>
>>>> Regards,
>>>> -Gautam.
>>>>
>>>> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>>>>
>>>>> Also, I want to point out John's recent PR that added a way to inject
>>>>> a Clock that is used for timestamp generation:
>>>>> https://github.com/apache/iceberg/pull/1389
>>>>>
>>>>> That fits nicely with the requirements here and would be an easy way
>>>>> to inject your own time, synchronized by an external service.
>>>>>
>>>>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Quick question below about the proposed usage of the timestamp:
>>>>>>
>>>>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID>
>>>>>> wrote:
>>>>>>
>>>>>> +1 Openlnx’s comment on implementation.
>>>>>>
>>>>>> Only if we have an external timing synchronization service and
>>>>>> enforce all clients using the service, timestamps of different clients are
>>>>>> not comparable.
>>>>>>
>>>>>>
>>>>>> Do we want to use the timestamp as the real timestamp of the last
>>>>>> change, or we want to use it only as a monotonously increasing more human
>>>>>> readable identifier?
>>>>>> Do we want to compare this timestamp against some external source, or
>>>>>> we just want to compare this timestamp with other timestamps in the
>>>>>> different snapshots of the same table?
>>>>>>
>>>>>>
>>>>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>>>>> delta reading; 2). How to enforce and implement a service/protocol for
>>>>>> timestamp sync among all clients.
>>>>>>
>>>>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could
>>>>>> be source of truth in any cases.
>>>>>>
>>>>>> 2). IMO, it should be an external package to Iceberg.
>>>>>>
>>>>>> Miao
>>>>>>
>>>>>> *From: *OpenInx <op...@gmail.com>
>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>>>>> *To: *Iceberg Dev List <de...@iceberg.apache.org>
>>>>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>>>>
>>>>>> I agree that  it's helpful to allow users to read the incremental
>>>>>> delta based timestamp,  as Jingsong said timestamp is more friendly.
>>>>>>
>>>>>> My question is how to implement this ?
>>>>>>
>>>>>>  If just attach the client's timestamp to the iceberg table when
>>>>>> committing,  then different clients may have different timestamp values
>>>>>> because of the skewing. In theory, these time values are not strictly
>>>>>> comparable, and can only be compared within the margin of error.
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> +1 for timestamps are linear, in implementation, maybe the writer
>>>>>> only needs to look at the previous snapshot timestamp.
>>>>>>
>>>>>> We're trying to think of iceberg as a message queue, Let's take the
>>>>>> popular queue Kafka as an example,
>>>>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>>>>> and timestamp:
>>>>>> - offset: It is used for incremental read, such as the state of a
>>>>>> checkpoint in a computing system.
>>>>>> - timestamp: It is explicitly specified by the user to specify the
>>>>>> scope of consumption. As start_timestamp of reading. Timestamp is a better
>>>>>> user aware interface. But offset/snapshotId is not human readable and
>>>>>> friendly.
>>>>>>
>>>>>> So there are scenarios where timestamp is used for incremental read.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>>>>>
>>>>>>
>>>>>> We are using incremental read for iceberg tables which gets quite few
>>>>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>>>>> snapshot ids and track state of last read snapshot Id.
>>>>>> We are using timestamp as fallback when the state is incorrect, but
>>>>>> as you mentioned if timestamps are linear then it works as expected.
>>>>>> We also found that incremental reader might be slow when dealing with
>>>>>> > 2k snapshots in range. we are currently testing a manifest based
>>>>>> incremental reader which looks at manifest entries instead of scanning
>>>>>> snapshot history and accessing each snapshot.
>>>>>>
>>>>>> Is there any reason you can't use snapshot based incremental read?
>>>>>>
>>>>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hello Devs,
>>>>>>                    We are looking into adding workflows that read
>>>>>> data incrementally based on commit time. The ability to read deltas between
>>>>>> start / end commit timestamps on a table and ability to resume reading from
>>>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>>>> linear in the current active snapshot history (newer versions always have
>>>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>>>
>>>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>>>> clientB might get to commit after clientA even if it's new snapshot
>>>>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>>>
>>>>>>
>>>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>>>> clients had a way to define their own globally synchronized timestamps
>>>>>> (using external service or some monotonically increasing UUID) then iceberg
>>>>>> could allow an API to set that on the snapshot or use that instead of
>>>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>>>> Is this a concern others have? If so how are folks handling this
>>>>>> today or are they not exposing such a feature at all due to the inherent
>>>>>> distributed timing problem? Would like to hear how others are
>>>>>> thinking/going about this. Thoughts?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Ashish Mehta <me...@gmail.com>.
> This is better handled by attaching a global transaction-id (e.g. UUID
that is monotonically increasing) to the snapshot metadata (iceberg allows
adding this to the summary)
I believe even if the client can provide metadata for a snapshot during
commit operation, you never know if that's the same order in which the
writers will commit (when you have multiple writers in the system).

-Ashish

On Thu, Sep 10, 2020 at 12:18 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> I should also add one more thing. The PR I linked to above is a good way
> to introduce a clock, but it was pointed out in the sync that even if we
> had a service that provided synchronized timestamps, there is no guarantee
> that there isn't a race condition between committers getting timestamps and
> then committing. So we would still have an out-of-order problem. It is best
> not to rely on timestamps other than for inspecting tables to get a rough
> idea of when a node committed.
>
> On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Thanks, Gautam! I think that's a good summary of the discussion.
>>
>> On Thu, Sep 10, 2020 at 11:56 AM Gautam <ga...@gmail.com> wrote:
>>
>>> Wanted to circle back on this thread. Linear timestamps was discussed
>>> during the sync and the conclusion was that timestamp based incremental
>>> reading is generally discouraged as that introduces correctness issues.
>>> Even if a custom clock is available keeping timestamps atomic and
>>> monotonically increasing is going to be a problem for applications.
>>> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
>>> potential issues e.g. a client committing an erroneous timestamp, that is
>>> way in the future, would block all other clients from committing.
>>>
>>> This is better handled by attaching a global transaction-id (e.g. UUID
>>> that is monotonically increasing) to the snapshot metadata (iceberg allows
>>> adding this to the summary). The incremental read application can then use
>>> the transaction-id as a key to the exact from/to snapshot-id to do
>>> incremental reading.
>>>
>>> Hope I covered the points raised.
>>>
>>> Regards,
>>> -Gautam.
>>>
>>> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>>>
>>>> Also, I want to point out John's recent PR that added a way to inject a
>>>> Clock that is used for timestamp generation:
>>>> https://github.com/apache/iceberg/pull/1389
>>>>
>>>> That fits nicely with the requirements here and would be an easy way to
>>>> inject your own time, synchronized by an external service.
>>>>
>>>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>>>> wrote:
>>>>
>>>>> Quick question below about the proposed usage of the timestamp:
>>>>>
>>>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID>
>>>>> wrote:
>>>>>
>>>>> +1 Openlnx’s comment on implementation.
>>>>>
>>>>> Only if we have an external timing synchronization service and enforce
>>>>> all clients using the service, timestamps of different clients are not
>>>>> comparable.
>>>>>
>>>>>
>>>>> Do we want to use the timestamp as the real timestamp of the last
>>>>> change, or we want to use it only as a monotonously increasing more human
>>>>> readable identifier?
>>>>> Do we want to compare this timestamp against some external source, or
>>>>> we just want to compare this timestamp with other timestamps in the
>>>>> different snapshots of the same table?
>>>>>
>>>>>
>>>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>>>> delta reading; 2). How to enforce and implement a service/protocol for
>>>>> timestamp sync among all clients.
>>>>>
>>>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could
>>>>> be source of truth in any cases.
>>>>>
>>>>> 2). IMO, it should be an external package to Iceberg.
>>>>>
>>>>> Miao
>>>>>
>>>>> *From: *OpenInx <op...@gmail.com>
>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>>>> *To: *Iceberg Dev List <de...@iceberg.apache.org>
>>>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>>>
>>>>> I agree that  it's helpful to allow users to read the incremental
>>>>> delta based timestamp,  as Jingsong said timestamp is more friendly.
>>>>>
>>>>> My question is how to implement this ?
>>>>>
>>>>>  If just attach the client's timestamp to the iceberg table when
>>>>> committing,  then different clients may have different timestamp values
>>>>> because of the skewing. In theory, these time values are not strictly
>>>>> comparable, and can only be compared within the margin of error.
>>>>>
>>>>>
>>>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> +1 for timestamps are linear, in implementation, maybe the writer only
>>>>> needs to look at the previous snapshot timestamp.
>>>>>
>>>>> We're trying to think of iceberg as a message queue, Let's take the
>>>>> popular queue Kafka as an example,
>>>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>>>> and timestamp:
>>>>> - offset: It is used for incremental read, such as the state of a
>>>>> checkpoint in a computing system.
>>>>> - timestamp: It is explicitly specified by the user to specify the
>>>>> scope of consumption. As start_timestamp of reading. Timestamp is a better
>>>>> user aware interface. But offset/snapshotId is not human readable and
>>>>> friendly.
>>>>>
>>>>> So there are scenarios where timestamp is used for incremental read.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>>
>>>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>>>>
>>>>>
>>>>> We are using incremental read for iceberg tables which gets quite few
>>>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>>>> snapshot ids and track state of last read snapshot Id.
>>>>> We are using timestamp as fallback when the state is incorrect, but as
>>>>> you mentioned if timestamps are linear then it works as expected.
>>>>> We also found that incremental reader might be slow when dealing with
>>>>> > 2k snapshots in range. we are currently testing a manifest based
>>>>> incremental reader which looks at manifest entries instead of scanning
>>>>> snapshot history and accessing each snapshot.
>>>>>
>>>>> Is there any reason you can't use snapshot based incremental read?
>>>>>
>>>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>>>>
>>>>> Hello Devs,
>>>>>                    We are looking into adding workflows that read data
>>>>> incrementally based on commit time. The ability to read deltas between
>>>>> start / end commit timestamps on a table and ability to resume reading from
>>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>>> linear in the current active snapshot history (newer versions always have
>>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>>
>>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>>> clientB might get to commit after clientA even if it's new snapshot
>>>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>>
>>>>>
>>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>>> clients had a way to define their own globally synchronized timestamps
>>>>> (using external service or some monotonically increasing UUID) then iceberg
>>>>> could allow an API to set that on the snapshot or use that instead of
>>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>>> Is this a concern others have? If so how are folks handling this today
>>>>> or are they not exposing such a feature at all due to the inherent
>>>>> distributed timing problem? Would like to hear how others are
>>>>> thinking/going about this. Thoughts?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I should also add one more thing. The PR I linked to above is a good way to
introduce a clock, but it was pointed out in the sync that even if we had a
service that provided synchronized timestamps, there is no guarantee that
there isn't a race condition between committers getting timestamps and then
committing. So we would still have an out-of-order problem. It is best not
to rely on timestamps other than for inspecting tables to get a rough idea
of when a node committed.

On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote:

> Thanks, Gautam! I think that's a good summary of the discussion.
>
> On Thu, Sep 10, 2020 at 11:56 AM Gautam <ga...@gmail.com> wrote:
>
>> Wanted to circle back on this thread. Linear timestamps was discussed
>> during the sync and the conclusion was that timestamp based incremental
>> reading is generally discouraged as that introduces correctness issues.
>> Even if a custom clock is available keeping timestamps atomic and
>> monotonically increasing is going to be a problem for applications.
>> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
>> potential issues e.g. a client committing an erroneous timestamp, that is
>> way in the future, would block all other clients from committing.
>>
>> This is better handled by attaching a global transaction-id (e.g. UUID
>> that is monotonically increasing) to the snapshot metadata (iceberg allows
>> adding this to the summary). The incremental read application can then use
>> the transaction-id as a key to the exact from/to snapshot-id to do
>> incremental reading.
>>
>> Hope I covered the points raised.
>>
>> Regards,
>> -Gautam.
>>
>> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>>
>>> Also, I want to point out John's recent PR that added a way to inject a
>>> Clock that is used for timestamp generation:
>>> https://github.com/apache/iceberg/pull/1389
>>>
>>> That fits nicely with the requirements here and would be an easy way to
>>> inject your own time, synchronized by an external service.
>>>
>>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>>> wrote:
>>>
>>>> Quick question below about the proposed usage of the timestamp:
>>>>
>>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID> wrote:
>>>>
>>>> +1 Openlnx’s comment on implementation.
>>>>
>>>> Only if we have an external timing synchronization service and enforce
>>>> all clients using the service, timestamps of different clients are not
>>>> comparable.
>>>>
>>>>
>>>> Do we want to use the timestamp as the real timestamp of the last
>>>> change, or we want to use it only as a monotonously increasing more human
>>>> readable identifier?
>>>> Do we want to compare this timestamp against some external source, or
>>>> we just want to compare this timestamp with other timestamps in the
>>>> different snapshots of the same table?
>>>>
>>>>
>>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>>> delta reading; 2). How to enforce and implement a service/protocol for
>>>> timestamp sync among all clients.
>>>>
>>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could
>>>> be source of truth in any cases.
>>>>
>>>> 2). IMO, it should be an external package to Iceberg.
>>>>
>>>> Miao
>>>>
>>>> *From: *OpenInx <op...@gmail.com>
>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>>> *To: *Iceberg Dev List <de...@iceberg.apache.org>
>>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>>
>>>> I agree that  it's helpful to allow users to read the incremental delta
>>>> based timestamp,  as Jingsong said timestamp is more friendly.
>>>>
>>>> My question is how to implement this ?
>>>>
>>>>  If just attach the client's timestamp to the iceberg table when
>>>> committing,  then different clients may have different timestamp values
>>>> because of the skewing. In theory, these time values are not strictly
>>>> comparable, and can only be compared within the margin of error.
>>>>
>>>>
>>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
>>>> wrote:
>>>>
>>>> +1 for timestamps are linear, in implementation, maybe the writer only
>>>> needs to look at the previous snapshot timestamp.
>>>>
>>>> We're trying to think of iceberg as a message queue, Let's take the
>>>> popular queue Kafka as an example,
>>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>>> and timestamp:
>>>> - offset: It is used for incremental read, such as the state of a
>>>> checkpoint in a computing system.
>>>> - timestamp: It is explicitly specified by the user to specify the
>>>> scope of consumption. As start_timestamp of reading. Timestamp is a better
>>>> user aware interface. But offset/snapshotId is not human readable and
>>>> friendly.
>>>>
>>>> So there are scenarios where timestamp is used for incremental read.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>>
>>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>>>
>>>>
>>>> We are using incremental read for iceberg tables which gets quite few
>>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>>> snapshot ids and track state of last read snapshot Id.
>>>> We are using timestamp as fallback when the state is incorrect, but as
>>>> you mentioned if timestamps are linear then it works as expected.
>>>> We also found that incremental reader might be slow when dealing with >
>>>> 2k snapshots in range. we are currently testing a manifest based
>>>> incremental reader which looks at manifest entries instead of scanning
>>>> snapshot history and accessing each snapshot.
>>>>
>>>> Is there any reason you can't use snapshot based incremental read?
>>>>
>>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Hello Devs,
>>>>                    We are looking into adding workflows that read data
>>>> incrementally based on commit time. The ability to read deltas between
>>>> start / end commit timestamps on a table and ability to resume reading from
>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>> linear in the current active snapshot history (newer versions always have
>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>
>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>> clientB might get to commit after clientA even if it's new snapshot
>>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>
>>>>
>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>> clients had a way to define their own globally synchronized timestamps
>>>> (using external service or some monotonically increasing UUID) then iceberg
>>>> could allow an API to set that on the snapshot or use that instead of
>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>> Is this a concern others have? If so how are folks handling this today
>>>> or are they not exposing such a feature at all due to the inherent
>>>> distributed timing problem? Would like to hear how others are
>>>> thinking/going about this. Thoughts?
>>>>
>>>> Cheers,
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Thanks, Gautam! I think that's a good summary of the discussion.

On Thu, Sep 10, 2020 at 11:56 AM Gautam <ga...@gmail.com> wrote:

> Wanted to circle back on this thread. Linear timestamps was discussed
> during the sync and the conclusion was that timestamp based incremental
> reading is generally discouraged as that introduces correctness issues.
> Even if a custom clock is available keeping timestamps atomic and
> monotonically increasing is going to be a problem for applications.
> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
> potential issues e.g. a client committing an erroneous timestamp, that is
> way in the future, would block all other clients from committing.
>
> This is better handled by attaching a global transaction-id (e.g. UUID
> that is monotonically increasing) to the snapshot metadata (iceberg allows
> adding this to the summary). The incremental read application can then use
> the transaction-id as a key to the exact from/to snapshot-id to do
> incremental reading.
>
> Hope I covered the points raised.
>
> Regards,
> -Gautam.
>
> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>
>> Also, I want to point out John's recent PR that added a way to inject a
>> Clock that is used for timestamp generation:
>> https://github.com/apache/iceberg/pull/1389
>>
>> That fits nicely with the requirements here and would be an easy way to
>> inject your own time, synchronized by an external service.
>>
>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>> wrote:
>>
>>> Quick question below about the proposed usage of the timestamp:
>>>
>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID> wrote:
>>>
>>> +1 Openlnx’s comment on implementation.
>>>
>>> Only if we have an external timing synchronization service and enforce
>>> all clients using the service, timestamps of different clients are not
>>> comparable.
>>>
>>>
>>> Do we want to use the timestamp as the real timestamp of the last
>>> change, or we want to use it only as a monotonously increasing more human
>>> readable identifier?
>>> Do we want to compare this timestamp against some external source, or we
>>> just want to compare this timestamp with other timestamps in the different
>>> snapshots of the same table?
>>>
>>>
>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>> delta reading; 2). How to enforce and implement a service/protocol for
>>> timestamp sync among all clients.
>>>
>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be
>>> source of truth in any cases.
>>>
>>> 2). IMO, it should be an external package to Iceberg.
>>>
>>> Miao
>>>
>>> *From: *OpenInx <op...@gmail.com>
>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>> *To: *Iceberg Dev List <de...@iceberg.apache.org>
>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>
>>> I agree that  it's helpful to allow users to read the incremental delta
>>> based timestamp,  as Jingsong said timestamp is more friendly.
>>>
>>> My question is how to implement this ?
>>>
>>>  If just attach the client's timestamp to the iceberg table when
>>> committing,  then different clients may have different timestamp values
>>> because of the skewing. In theory, these time values are not strictly
>>> comparable, and can only be compared within the margin of error.
>>>
>>>
>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>> +1 for timestamps are linear, in implementation, maybe the writer only
>>> needs to look at the previous snapshot timestamp.
>>>
>>> We're trying to think of iceberg as a message queue, Let's take the
>>> popular queue Kafka as an example,
>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>> and timestamp:
>>> - offset: It is used for incremental read, such as the state of a
>>> checkpoint in a computing system.
>>> - timestamp: It is explicitly specified by the user to specify the scope
>>> of consumption. As start_timestamp of reading. Timestamp is a better user
>>> aware interface. But offset/snapshotId is not human readable and friendly.
>>>
>>> So there are scenarios where timestamp is used for incremental read.
>>>
>>> Best,
>>> Jingsong
>>>
>>>
>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>>
>>>
>>> We are using incremental read for iceberg tables which gets quite few
>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>> snapshot ids and track state of last read snapshot Id.
>>> We are using timestamp as fallback when the state is incorrect, but as
>>> you mentioned if timestamps are linear then it works as expected.
>>> We also found that incremental reader might be slow when dealing with >
>>> 2k snapshots in range. we are currently testing a manifest based
>>> incremental reader which looks at manifest entries instead of scanning
>>> snapshot history and accessing each snapshot.
>>>
>>> Is there any reason you can't use snapshot based incremental read?
>>>
>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>>
>>> Hello Devs,
>>>                    We are looking into adding workflows that read data
>>> incrementally based on commit time. The ability to read deltas between
>>> start / end commit timestamps on a table and ability to resume reading from
>>> last read end timestamp. In that regard, we need the timestamps to be
>>> linear in the current active snapshot history (newer versions always have
>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>> newer, there isn't a check to ensure timestamps are linear.
>>>
>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>> are slightly off (say by a couple seconds), are committing frequently,
>>> clientB might get to commit after clientA even if it's new snapshot
>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>
>>>
>>> On the other hand, restricting commits due to out-of-order timestamps
>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>> might want to enforce based on System.currentTimeMillis(). Although if
>>> clients had a way to define their own globally synchronized timestamps
>>> (using external service or some monotonically increasing UUID) then iceberg
>>> could allow an API to set that on the snapshot or use that instead of
>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>> Sequence numbers in v2 format to track Deletes and Appends.
>>> Is this a concern others have? If so how are folks handling this today
>>> or are they not exposing such a feature at all due to the inherent
>>> distributed timing problem? Would like to hear how others are
>>> thinking/going about this. Thoughts?
>>>
>>> Cheers,
>>>
>>> -Gautam.
>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Gautam <ga...@gmail.com>.
Wanted to circle back on this thread. Linear timestamps was discussed
during the sync and the conclusion was that timestamp based incremental
reading is generally discouraged as that introduces correctness issues.
Even if a custom clock is available keeping timestamps atomic and
monotonically increasing is going to be a problem for applications.
Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
potential issues e.g. a client committing an erroneous timestamp, that is
way in the future, would block all other clients from committing.

This is better handled by attaching a global transaction-id (e.g. UUID that
is monotonically increasing) to the snapshot metadata (iceberg allows
adding this to the summary). The incremental read application can then use
the transaction-id as a key to the exact from/to snapshot-id to do
incremental reading.

Hope I covered the points raised.

Regards,
-Gautam.

On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>
> Also, I want to point out John's recent PR that added a way to inject a
> Clock that is used for timestamp generation:
> https://github.com/apache/iceberg/pull/1389
>
> That fits nicely with the requirements here and would be an easy way to
> inject your own time, synchronized by an external service.
>
> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
> wrote:
>
>> Quick question below about the proposed usage of the timestamp:
>>
>> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID> wrote:
>>
>> +1 Openlnx’s comment on implementation.
>>
>> Only if we have an external timing synchronization service and enforce
>> all clients using the service, timestamps of different clients are not
>> comparable.
>>
>>
>> Do we want to use the timestamp as the real timestamp of the last change,
>> or we want to use it only as a monotonously increasing more human readable
>> identifier?
>> Do we want to compare this timestamp against some external source, or we
>> just want to compare this timestamp with other timestamps in the different
>> snapshots of the same table?
>>
>>
>> So, there are two asks: 1). Whether to have a timestamp based API for
>> delta reading; 2). How to enforce and implement a service/protocol for
>> timestamp sync among all clients.
>>
>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be
>> source of truth in any cases.
>>
>> 2). IMO, it should be an external package to Iceberg.
>>
>> Miao
>>
>> *From: *OpenInx <op...@gmail.com>
>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>> *To: *Iceberg Dev List <de...@iceberg.apache.org>
>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>
>> I agree that  it's helpful to allow users to read the incremental delta
>> based timestamp,  as Jingsong said timestamp is more friendly.
>>
>> My question is how to implement this ?
>>
>>  If just attach the client's timestamp to the iceberg table when
>> committing,  then different clients may have different timestamp values
>> because of the skewing. In theory, these time values are not strictly
>> comparable, and can only be compared within the margin of error.
>>
>>
>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>> +1 for timestamps are linear, in implementation, maybe the writer only
>> needs to look at the previous snapshot timestamp.
>>
>> We're trying to think of iceberg as a message queue, Let's take the
>> popular queue Kafka as an example,
>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
>> timestamp:
>> - offset: It is used for incremental read, such as the state of a
>> checkpoint in a computing system.
>> - timestamp: It is explicitly specified by the user to specify the scope
>> of consumption. As start_timestamp of reading. Timestamp is a better user
>> aware interface. But offset/snapshotId is not human readable and friendly.
>>
>> So there are scenarios where timestamp is used for incremental read.
>>
>> Best,
>> Jingsong
>>
>>
>> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>>
>>
>> We are using incremental read for iceberg tables which gets quite few
>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>> snapshot ids and track state of last read snapshot Id.
>> We are using timestamp as fallback when the state is incorrect, but as
>> you mentioned if timestamps are linear then it works as expected.
>> We also found that incremental reader might be slow when dealing with >
>> 2k snapshots in range. we are currently testing a manifest based
>> incremental reader which looks at manifest entries instead of scanning
>> snapshot history and accessing each snapshot.
>>
>> Is there any reason you can't use snapshot based incremental read?
>>
>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>
>> Hello Devs,
>>                    We are looking into adding workflows that read data
>> incrementally based on commit time. The ability to read deltas between
>> start / end commit timestamps on a table and ability to resume reading from
>> last read end timestamp. In that regard, we need the timestamps to be
>> linear in the current active snapshot history (newer versions always have
>> higher timestamps). Although Iceberg commit flow ensures the versions are
>> newer, there isn't a check to ensure timestamps are linear.
>>
>> Example flow, if two clients (clientA and clientB), whose time-clocks are
>> slightly off (say by a couple seconds), are committing frequently, clientB
>> might get to commit after clientA even if it's new snapshot timestamps is
>> out of order. I might be wrong but I haven't found a check in
>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>
>>
>> On the other hand, restricting commits due to out-of-order timestamps can
>> hurt commit throughput so I can see why this isn't something Iceberg might
>> want to enforce based on System.currentTimeMillis(). Although if clients
>> had a way to define their own globally synchronized timestamps (using
>> external service or some monotonically increasing UUID) then iceberg could
>> allow an API to set that on the snapshot or use that instead of
>> System.currentTimeMillis(). Iceberg exposes something similar using
>> Sequence numbers in v2 format to track Deletes and Appends.
>> Is this a concern others have? If so how are folks handling this today or
>> are they not exposing such a feature at all due to the inherent distributed
>> timing problem? Would like to hear how others are thinking/going about
>> this. Thoughts?
>>
>> Cheers,
>>
>> -Gautam.
>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Hi everyone, I'm putting this on the agenda for today's Iceberg sync.

Also, I want to point out John's recent PR that added a way to inject a
Clock that is used for timestamp generation:
https://github.com/apache/iceberg/pull/1389

That fits nicely with the requirements here and would be an easy way to
inject your own time, synchronized by an external service.

On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
wrote:

> Quick question below about the proposed usage of the timestamp:
>
> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID> wrote:
>
> +1 Openlnx’s comment on implementation.
>
> Only if we have an external timing synchronization service and enforce all
> clients using the service, timestamps of different clients are not
> comparable.
>
>
> Do we want to use the timestamp as the real timestamp of the last change,
> or we want to use it only as a monotonously increasing more human readable
> identifier?
> Do we want to compare this timestamp against some external source, or we
> just want to compare this timestamp with other timestamps in the different
> snapshots of the same table?
>
>
> So, there are two asks: 1). Whether to have a timestamp based API for
> delta reading; 2). How to enforce and implement a service/protocol for
> timestamp sync among all clients.
>
> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be
> source of truth in any cases.
>
> 2). IMO, it should be an external package to Iceberg.
>
> Miao
>
> *From: *OpenInx <op...@gmail.com>
> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
> *Date: *Tuesday, September 8, 2020 at 7:55 PM
> *To: *Iceberg Dev List <de...@iceberg.apache.org>
> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>
> I agree that  it's helpful to allow users to read the incremental delta
> based timestamp,  as Jingsong said timestamp is more friendly.
>
> My question is how to implement this ?
>
>  If just attach the client's timestamp to the iceberg table when
> committing,  then different clients may have different timestamp values
> because of the skewing. In theory, these time values are not strictly
> comparable, and can only be compared within the margin of error.
>
>
> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
> +1 for timestamps are linear, in implementation, maybe the writer only
> needs to look at the previous snapshot timestamp.
>
> We're trying to think of iceberg as a message queue, Let's take the
> popular queue Kafka as an example,
> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
> timestamp:
> - offset: It is used for incremental read, such as the state of a
> checkpoint in a computing system.
> - timestamp: It is explicitly specified by the user to specify the scope
> of consumption. As start_timestamp of reading. Timestamp is a better user
> aware interface. But offset/snapshotId is not human readable and friendly.
>
> So there are scenarios where timestamp is used for incremental read.
>
> Best,
> Jingsong
>
>
> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>
>
> We are using incremental read for iceberg tables which gets quite few
> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
> snapshot ids and track state of last read snapshot Id.
> We are using timestamp as fallback when the state is incorrect, but as you
> mentioned if timestamps are linear then it works as expected.
> We also found that incremental reader might be slow when dealing with > 2k
> snapshots in range. we are currently testing a manifest based incremental
> reader which looks at manifest entries instead of scanning snapshot history
> and accessing each snapshot.
>
> Is there any reason you can't use snapshot based incremental read?
>
> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>
> Hello Devs,
>                    We are looking into adding workflows that read data
> incrementally based on commit time. The ability to read deltas between
> start / end commit timestamps on a table and ability to resume reading from
> last read end timestamp. In that regard, we need the timestamps to be
> linear in the current active snapshot history (newer versions always have
> higher timestamps). Although Iceberg commit flow ensures the versions are
> newer, there isn't a check to ensure timestamps are linear.
>
> Example flow, if two clients (clientA and clientB), whose time-clocks are
> slightly off (say by a couple seconds), are committing frequently, clientB
> might get to commit after clientA even if it's new snapshot timestamps is
> out of order. I might be wrong but I haven't found a check in
> HadoopTableOperations.commit() to ensure this above case does not happen.
>
>
> On the other hand, restricting commits due to out-of-order timestamps can
> hurt commit throughput so I can see why this isn't something Iceberg might
> want to enforce based on System.currentTimeMillis(). Although if clients
> had a way to define their own globally synchronized timestamps (using
> external service or some monotonically increasing UUID) then iceberg could
> allow an API to set that on the snapshot or use that instead of
> System.currentTimeMillis(). Iceberg exposes something similar using
> Sequence numbers in v2 format to track Deletes and Appends.
> Is this a concern others have? If so how are folks handling this today or
> are they not exposing such a feature at all due to the inherent distributed
> timing problem? Would like to hear how others are thinking/going about
> this. Thoughts?
>
> Cheers,
>
> -Gautam.
>
>
>
> --
> Best, Jingsong Lee
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
Quick question below about the proposed usage of the timestamp:

> On Sep 9, 2020, at 7:24 AM, Miao Wang <mi...@adobe.com.INVALID> wrote:
> 
> +1 Openlnx’s comment on implementation.
>  
> Only if we have an external timing synchronization service and enforce all clients using the service, timestamps of different clients are not comparable.

Do we want to use the timestamp as the real timestamp of the last change, or we want to use it only as a monotonously increasing more human readable identifier?
Do we want to compare this timestamp against some external source, or we just want to compare this timestamp with other timestamps in the different snapshots of the same table?

>  
> So, there are two asks: 1). Whether to have a timestamp based API for delta reading; 2). How to enforce and implement a service/protocol for timestamp sync among all clients.
>  
> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be source of truth in any cases.
>  
> 2). IMO, it should be an external package to Iceberg.
>  
> Miao  
>  
> From: OpenInx <op...@gmail.com>
> Reply-To: "dev@iceberg.apache.org" <de...@iceberg.apache.org>
> Date: Tuesday, September 8, 2020 at 7:55 PM
> To: Iceberg Dev List <de...@iceberg.apache.org>
> Subject: Re: Timestamp Based Incremental Reading in Iceberg ...
>  
> I agree that  it's helpful to allow users to read the incremental delta based timestamp,  as Jingsong said timestamp is more friendly. 
>  
> My question is how to implement this ? 
>  
>  If just attach the client's timestamp to the iceberg table when committing,  then different clients may have different timestamp values because of the skewing. In theory, these time values are not strictly comparable, and can only be compared within the margin of error. 
>  
>  
> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> wrote:
>> +1 for timestamps are linear, in implementation, maybe the writer only needs to look at the previous snapshot timestamp.
>>  
>> We're trying to think of iceberg as a message queue, Let's take the popular queue Kafka as an example,
>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and timestamp:
>> - offset: It is used for incremental read, such as the state of a checkpoint in a computing system.
>> - timestamp: It is explicitly specified by the user to specify the scope of consumption. As start_timestamp of reading. Timestamp is a better user aware interface. But offset/snapshotId is not human readable and friendly.
>>  
>> So there are scenarios where timestamp is used for incremental read.
>>  
>> Best,
>> Jingsong
>>  
>>  
>> On Wed, Sep 9, 2020 at 12:45 AM Sud <sudssf2014@gmail.com <ma...@gmail.com>> wrote:
>>>  
>>> We are using incremental read for iceberg tables which gets quite few appends ( ~500- 1000 per hour) . but instead of using timestamp we use snapshot ids and track state of last read snapshot Id. 
>>> We are using timestamp as fallback when the state is incorrect, but as you mentioned if timestamps are linear then it works as expected.
>>> We also found that incremental reader might be slow when dealing with > 2k snapshots in range. we are currently testing a manifest based incremental reader which looks at manifest entries instead of scanning snapshot history and accessing each snapshot.
>>>  
>>> Is there any reason you can't use snapshot based incremental read? 
>>>  
>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <gautamkowshik@gmail.com <ma...@gmail.com>> wrote:
>>>> Hello Devs, 
>>>>                    We are looking into adding workflows that read data incrementally based on commit time. The ability to read deltas between start / end commit timestamps on a table and ability to resume reading from last read end timestamp. In that regard, we need the timestamps to be linear in the current active snapshot history (newer versions always have higher timestamps). Although Iceberg commit flow ensures the versions are newer, there isn't a check to ensure timestamps are linear. 
>>>>  
>>>> Example flow, if two clients (clientA and clientB), whose time-clocks are slightly off (say by a couple seconds), are committing frequently, clientB might get to commit after clientA even if it's new snapshot timestamps is out of order. I might be wrong but I haven't found a check in HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>  
>>>> On the other hand, restricting commits due to out-of-order timestamps can hurt commit throughput so I can see why this isn't something Iceberg might want to enforce based on System.currentTimeMillis(). Although if clients had a way to define their own globally synchronized timestamps (using external service or some monotonically increasing UUID) then iceberg could allow an API to set that on the snapshot or use that instead of System.currentTimeMillis(). Iceberg exposes something similar using Sequence numbers in v2 format to track Deletes and Appends.
>>>> 
>>>> Is this a concern others have? If so how are folks handling this today or are they not exposing such a feature at all due to the inherent distributed timing problem? Would like to hear how others are thinking/going about this. Thoughts?
>>>> 
>>>> Cheers,
>>>>  
>>>> -Gautam.
>> 
>> 
>>  
>> -- 
>> Best, Jingsong Lee


Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Miao Wang <mi...@adobe.com.INVALID>.
+1 Openlnx’s comment on implementation.

Only if we have an external timing synchronization service and enforce all clients using the service, timestamps of different clients are not comparable.

So, there are two asks: 1). Whether to have a timestamp based API for delta reading; 2). How to enforce and implement a service/protocol for timestamp sync among all clients.

1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be source of truth in any cases.

2). IMO, it should be an external package to Iceberg.

Miao

From: OpenInx <op...@gmail.com>
Reply-To: "dev@iceberg.apache.org" <de...@iceberg.apache.org>
Date: Tuesday, September 8, 2020 at 7:55 PM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: Timestamp Based Incremental Reading in Iceberg ...

I agree that  it's helpful to allow users to read the incremental delta based timestamp,  as Jingsong said timestamp is more friendly.

My question is how to implement this ?

 If just attach the client's timestamp to the iceberg table when committing,  then different clients may have different timestamp values because of the skewing. In theory, these time values are not strictly comparable, and can only be compared within the margin of error.


On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com>> wrote:
+1 for timestamps are linear, in implementation, maybe the writer only needs to look at the previous snapshot timestamp.

We're trying to think of iceberg as a message queue, Let's take the popular queue Kafka as an example,
Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and timestamp:
- offset: It is used for incremental read, such as the state of a checkpoint in a computing system.
- timestamp: It is explicitly specified by the user to specify the scope of consumption. As start_timestamp of reading. Timestamp is a better user aware interface. But offset/snapshotId is not human readable and friendly.

So there are scenarios where timestamp is used for incremental read.

Best,
Jingsong


On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com>> wrote:

We are using incremental read for iceberg tables which gets quite few appends ( ~500- 1000 per hour) . but instead of using timestamp we use snapshot ids and track state of last read snapshot Id.
We are using timestamp as fallback when the state is incorrect, but as you mentioned if timestamps are linear then it works as expected.
We also found that incremental reader might be slow when dealing with > 2k snapshots in range. we are currently testing a manifest based incremental reader which looks at manifest entries instead of scanning snapshot history and accessing each snapshot.

Is there any reason you can't use snapshot based incremental read?

On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com>> wrote:
Hello Devs,
                   We are looking into adding workflows that read data incrementally based on commit time. The ability to read deltas between start / end commit timestamps on a table and ability to resume reading from last read end timestamp. In that regard, we need the timestamps to be linear in the current active snapshot history (newer versions always have higher timestamps). Although Iceberg commit flow ensures the versions are newer, there isn't a check to ensure timestamps are linear.

Example flow, if two clients (clientA and clientB), whose time-clocks are slightly off (say by a couple seconds), are committing frequently, clientB might get to commit after clientA even if it's new snapshot timestamps is out of order. I might be wrong but I haven't found a check in HadoopTableOperations.commit() to ensure this above case does not happen.

On the other hand, restricting commits due to out-of-order timestamps can hurt commit throughput so I can see why this isn't something Iceberg might want to enforce based on System.currentTimeMillis(). Although if clients had a way to define their own globally synchronized timestamps (using external service or some monotonically increasing UUID) then iceberg could allow an API to set that on the snapshot or use that instead of System.currentTimeMillis(). Iceberg exposes something similar using Sequence numbers in v2 format to track Deletes and Appends.
Is this a concern others have? If so how are folks handling this today or are they not exposing such a feature at all due to the inherent distributed timing problem? Would like to hear how others are thinking/going about this. Thoughts?

Cheers,

-Gautam.


--
Best, Jingsong Lee

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by OpenInx <op...@gmail.com>.
I agree that  it's helpful to allow users to read the incremental delta
based timestamp,  as Jingsong said timestamp is more friendly.

My question is how to implement this ?

 If just attach the client's timestamp to the iceberg table when
committing,  then different clients may have different timestamp values
because of the skewing. In theory, these time values are not strictly
comparable, and can only be compared within the margin of error.


On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <ji...@gmail.com> wrote:

> +1 for timestamps are linear, in implementation, maybe the writer only
> needs to look at the previous snapshot timestamp.
>
> We're trying to think of iceberg as a message queue, Let's take the
> popular queue Kafka as an example,
> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
> timestamp:
> - offset: It is used for incremental read, such as the state of a
> checkpoint in a computing system.
> - timestamp: It is explicitly specified by the user to specify the scope
> of consumption. As start_timestamp of reading. Timestamp is a better user
> aware interface. But offset/snapshotId is not human readable and friendly.
>
> So there are scenarios where timestamp is used for incremental read.
>
> Best,
> Jingsong
>
>
> On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:
>
>>
>> We are using incremental read for iceberg tables which gets quite few
>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>> snapshot ids and track state of last read snapshot Id.
>> We are using timestamp as fallback when the state is incorrect, but as
>> you mentioned if timestamps are linear then it works as expected.
>> We also found that incremental reader might be slow when dealing with >
>> 2k snapshots in range. we are currently testing a manifest based
>> incremental reader which looks at manifest entries instead of scanning
>> snapshot history and accessing each snapshot.
>>
>> Is there any reason you can't use snapshot based incremental read?
>>
>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>>
>>> Hello Devs,
>>>                    We are looking into adding workflows that read data
>>> incrementally based on commit time. The ability to read deltas between
>>> start / end commit timestamps on a table and ability to resume reading from
>>> last read end timestamp. In that regard, we need the timestamps to be
>>> linear in the current active snapshot history (newer versions always have
>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>> newer, there isn't a check to ensure timestamps are linear.
>>>
>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>> are slightly off (say by a couple seconds), are committing frequently,
>>> clientB might get to commit after clientA even if it's new snapshot
>>> timestamps is out of order. I might be wrong but I haven't found a check in
>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>
>>> On the other hand, restricting commits due to out-of-order timestamps
>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>> might want to enforce based on System.currentTimeMillis(). Although if
>>> clients had a way to define their own globally synchronized timestamps
>>> (using external service or some monotonically increasing UUID) then iceberg
>>> could allow an API to set that on the snapshot or use that instead of
>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>
>>> Is this a concern others have? If so how are folks handling this today
>>> or are they not exposing such a feature at all due to the inherent
>>> distributed timing problem? Would like to hear how others are
>>> thinking/going about this. Thoughts?
>>>
>>> Cheers,
>>>
>>> -Gautam.
>>>
>>
>
> --
> Best, Jingsong Lee
>

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Jingsong Li <ji...@gmail.com>.
+1 for timestamps are linear, in implementation, maybe the writer only
needs to look at the previous snapshot timestamp.

We're trying to think of iceberg as a message queue, Let's take the popular
queue Kafka as an example,
Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
timestamp:
- offset: It is used for incremental read, such as the state of a
checkpoint in a computing system.
- timestamp: It is explicitly specified by the user to specify the scope of
consumption. As start_timestamp of reading. Timestamp is a better user
aware interface. But offset/snapshotId is not human readable and friendly.

So there are scenarios where timestamp is used for incremental read.

Best,
Jingsong


On Wed, Sep 9, 2020 at 12:45 AM Sud <su...@gmail.com> wrote:

>
> We are using incremental read for iceberg tables which gets quite few
> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
> snapshot ids and track state of last read snapshot Id.
> We are using timestamp as fallback when the state is incorrect, but as you
> mentioned if timestamps are linear then it works as expected.
> We also found that incremental reader might be slow when dealing with > 2k
> snapshots in range. we are currently testing a manifest based incremental
> reader which looks at manifest entries instead of scanning snapshot history
> and accessing each snapshot.
>
> Is there any reason you can't use snapshot based incremental read?
>
> On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:
>
>> Hello Devs,
>>                    We are looking into adding workflows that read data
>> incrementally based on commit time. The ability to read deltas between
>> start / end commit timestamps on a table and ability to resume reading from
>> last read end timestamp. In that regard, we need the timestamps to be
>> linear in the current active snapshot history (newer versions always have
>> higher timestamps). Although Iceberg commit flow ensures the versions are
>> newer, there isn't a check to ensure timestamps are linear.
>>
>> Example flow, if two clients (clientA and clientB), whose time-clocks are
>> slightly off (say by a couple seconds), are committing frequently, clientB
>> might get to commit after clientA even if it's new snapshot timestamps is
>> out of order. I might be wrong but I haven't found a check in
>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>
>> On the other hand, restricting commits due to out-of-order timestamps can
>> hurt commit throughput so I can see why this isn't something Iceberg might
>> want to enforce based on System.currentTimeMillis(). Although if clients
>> had a way to define their own globally synchronized timestamps (using
>> external service or some monotonically increasing UUID) then iceberg could
>> allow an API to set that on the snapshot or use that instead of
>> System.currentTimeMillis(). Iceberg exposes something similar using
>> Sequence numbers in v2 format to track Deletes and Appends.
>>
>> Is this a concern others have? If so how are folks handling this today or
>> are they not exposing such a feature at all due to the inherent distributed
>> timing problem? Would like to hear how others are thinking/going about
>> this. Thoughts?
>>
>> Cheers,
>>
>> -Gautam.
>>
>

-- 
Best, Jingsong Lee

Re: Timestamp Based Incremental Reading in Iceberg ...

Posted by Sud <su...@gmail.com>.
We are using incremental read for iceberg tables which gets quite few
appends ( ~500- 1000 per hour) . but instead of using timestamp we use
snapshot ids and track state of last read snapshot Id.
We are using timestamp as fallback when the state is incorrect, but as you
mentioned if timestamps are linear then it works as expected.
We also found that incremental reader might be slow when dealing with > 2k
snapshots in range. we are currently testing a manifest based incremental
reader which looks at manifest entries instead of scanning snapshot history
and accessing each snapshot.

Is there any reason you can't use snapshot based incremental read?

On Tue, Sep 8, 2020 at 9:06 AM Gautam <ga...@gmail.com> wrote:

> Hello Devs,
>                    We are looking into adding workflows that read data
> incrementally based on commit time. The ability to read deltas between
> start / end commit timestamps on a table and ability to resume reading from
> last read end timestamp. In that regard, we need the timestamps to be
> linear in the current active snapshot history (newer versions always have
> higher timestamps). Although Iceberg commit flow ensures the versions are
> newer, there isn't a check to ensure timestamps are linear.
>
> Example flow, if two clients (clientA and clientB), whose time-clocks are
> slightly off (say by a couple seconds), are committing frequently, clientB
> might get to commit after clientA even if it's new snapshot timestamps is
> out of order. I might be wrong but I haven't found a check in
> HadoopTableOperations.commit() to ensure this above case does not happen.
>
> On the other hand, restricting commits due to out-of-order timestamps can
> hurt commit throughput so I can see why this isn't something Iceberg might
> want to enforce based on System.currentTimeMillis(). Although if clients
> had a way to define their own globally synchronized timestamps (using
> external service or some monotonically increasing UUID) then iceberg could
> allow an API to set that on the snapshot or use that instead of
> System.currentTimeMillis(). Iceberg exposes something similar using
> Sequence numbers in v2 format to track Deletes and Appends.
>
> Is this a concern others have? If so how are folks handling this today or
> are they not exposing such a feature at all due to the inherent distributed
> timing problem? Would like to hear how others are thinking/going about
> this. Thoughts?
>
> Cheers,
>
> -Gautam.
>