You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by "Bhargav Bipinchandra Naik (Seller Platform-BLR)" <bh...@flipkart.com> on 2019/05/06 13:23:17 UTC

Consuming delta from Hive tables

We have a scenario where we want to consume only delta updates from Hive
tables.
- Multiple producers are updating data in Hive table
- Multiple consumer reading data from the Hive table

Consumption pattern:
- Get all data that has been updated since last time I read.

Is there any mechanism in Hive 3.0 which can enable above consumption
pattern?

I see there is a construct of row__id(writeid, bucketid, rowid) in ACID
tables.
- Can row__id be used in this scenario?
- How is the "writeid" generated?
- Is there some meta information which captures the time when the rows were
actually visible for read?

Re: Consuming delta from Hive tables

Posted by Jesus Camacho Rodriguez <jc...@apache.org>.
Hi Bhargav,

We solve a similar problem for incremental maintenance
<https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentMaterializationRule.java>
for materialized views.

row__id.writeid can be used for that scenario indeed. You just need to know
the current snapshot of the system at reading time (<high_watermark, list
of open transactions>). Then you just need to add a filter operator on top
of that table, making explicit the data contained in it. The filter will
roughly take the form ROW_ID.writeid <= high_watermark and ROW_ID.writeid
not in (open/invalid_ids). Information about how "writeid" is generated can
be found in https://jira.apache.org/jira/browse/HIVE-18192 .

Note that when source tables are not append only and update/delete record
operations may have been executed over them, problem becomes trickier since
currently there is no way to retrieve update/delete records from the delta
files (contributions would be welcome).

Cheers,
Jesús


On Mon, May 6, 2019 at 6:23 AM Bhargav Bipinchandra Naik (Seller
Platform-BLR) <bh...@flipkart.com> wrote:

> We have a scenario where we want to consume only delta updates from Hive
> tables.
> - Multiple producers are updating data in Hive table
> - Multiple consumer reading data from the Hive table
>
> Consumption pattern:
> - Get all data that has been updated since last time I read.
>
> Is there any mechanism in Hive 3.0 which can enable above consumption
> pattern?
>
> I see there is a construct of row__id(writeid, bucketid, rowid) in ACID
> tables.
> - Can row__id be used in this scenario?
> - How is the "writeid" generated?
> - Is there some meta information which captures the time when the rows
> were actually visible for read?
>

Re: Consuming delta from Hive tables

Posted by Alan Gates <al...@gmail.com>.
On Sun, May 19, 2019 at 11:21 PM Bhargav Bipinchandra Naik (Seller
Platform-BLR) <bh...@flipkart.com> wrote:

> Hi Alan,
>
>
> Are write_ids monotonically increasing?
>
They are assigned monotonically, but the transactions they are a part of
may commit at different times, so you can't use it as a low water mark.
That is, if you looked at the state of the table at time t1 and say that
write_id1 and write_id3 had been committed, it does not mean that there
won't be a write_id2 the next time you look, as the transaction for
write_id2 could have started before the transaction for write_id3 but
finished after.


> Are write_ids accessible in the hive query?
>
For e.g.:
> select * from table_name where write_id > N;
>
No.  For full ACID (ORC) tables the write_id is part of a pseudo-column
struct called row__id (not to be confused with the row_id mentioned before,
sorry we overloaded the term).  For insert only ACID (Non-ORC tables) the
write id is inferred from the filename.  In both cases the metastore
doesn't know about these columns, and thus I believe will fail the query
saying "no such column".

Alan.

>
> Basically I am trying to understand if I can use write_id to consume only
> updated rows.
> Store the maximum write_id(X) seen in the result and next time query for
> all rows with row_id greater than X.
>
> Thanks,
> Bhargav
>
> On Fri, May 17, 2019 at 10:37 PM Alan Gates <al...@gmail.com> wrote:
>
>> Sorry, looks like you sent this earlier and I missed it.
>>
>> A couple of things.  One, write_id is per transaction per table.  So for
>> table T, all rows written in w1 will have the same write_id, though they
>> will each have their own monotonically increasing row_ids.  Row_ids are
>> scoped by a write_id, so if both w1 and w2 insert a 100 rows, w1 would have
>> write_id 1, and row_ids 0-99 while w2's rows would have write_id 2 and
>> row_ids 0-99.
>>
>> Two, If w1 and w2 both attempted to update or delete (not insert) records
>> from the same partition of table T, then w1 would fail at commit time
>> because it would see that w2 had already committed and there's a possible
>> conflict.  This avoids lost updates and deleted records magically
>> reappearing.
>>
>> Alan.
>>
>> On Fri, May 17, 2019 at 4:44 AM Bhargav Bipinchandra Naik (Seller
>> Platform-BLR) <bh...@flipkart.com> wrote:
>>
>>> Is the following scenario supported?
>>>
>>> *timestamp:* t1 < t2 < t3 < t4 < t5 < t6
>>>
>>> *w1 -* transaction which updates subset of rows in table T {start_time:
>>> t1, end_time: t5}
>>> *w2 -* transaction which updates subset of rows in table T {start_time:
>>> t2, end_time: t3}
>>> *r1 - *job which reads rows from table T {start_time: t4}
>>> *r2 - *job which reads rows from table T {start_time: t6}
>>>
>>> - Is the write_id strictly increasing number across rows?
>>> - Is the write_id a version number per row and not a global construct?
>>> - Will the subset of rows updated by w1 have write_ids greater than
>>> write_ids of row updated by w2?
>>>
>>> Say if job r1 consumed the data at t4 had maximum write_id 100.
>>> Will rows updated by job w1 (end_time: t5) always have write_id > 100?
>>>
>>> Basically I need some kind of checkpoint using which the next run of the
>>> read job can read only the data updated since the checkpoint.
>>>
>>> Thanks,
>>> -Bhargav
>>>
>>>
>>>
>>>
>>>

Re: Consuming delta from Hive tables

Posted by "Bhargav Bipinchandra Naik (Seller Platform-BLR)" <bh...@flipkart.com>.
Hi Alan,


Are write_ids monotonically increasing?
Are write_ids accessible in the hive query?
For e.g.:
select * from table_name where write_id > N;

Basically I am trying to understand if I can use write_id to consume only
updated rows.
Store the maximum write_id(X) seen in the result and next time query for
all rows with row_id greater than X.

Thanks,
Bhargav

On Fri, May 17, 2019 at 10:37 PM Alan Gates <al...@gmail.com> wrote:

> Sorry, looks like you sent this earlier and I missed it.
>
> A couple of things.  One, write_id is per transaction per table.  So for
> table T, all rows written in w1 will have the same write_id, though they
> will each have their own monotonically increasing row_ids.  Row_ids are
> scoped by a write_id, so if both w1 and w2 insert a 100 rows, w1 would have
> write_id 1, and row_ids 0-99 while w2's rows would have write_id 2 and
> row_ids 0-99.
>
> Two, If w1 and w2 both attempted to update or delete (not insert) records
> from the same partition of table T, then w1 would fail at commit time
> because it would see that w2 had already committed and there's a possible
> conflict.  This avoids lost updates and deleted records magically
> reappearing.
>
> Alan.
>
> On Fri, May 17, 2019 at 4:44 AM Bhargav Bipinchandra Naik (Seller
> Platform-BLR) <bh...@flipkart.com> wrote:
>
>> Is the following scenario supported?
>>
>> *timestamp:* t1 < t2 < t3 < t4 < t5 < t6
>>
>> *w1 -* transaction which updates subset of rows in table T {start_time:
>> t1, end_time: t5}
>> *w2 -* transaction which updates subset of rows in table T {start_time:
>> t2, end_time: t3}
>> *r1 - *job which reads rows from table T {start_time: t4}
>> *r2 - *job which reads rows from table T {start_time: t6}
>>
>> - Is the write_id strictly increasing number across rows?
>> - Is the write_id a version number per row and not a global construct?
>> - Will the subset of rows updated by w1 have write_ids greater than
>> write_ids of row updated by w2?
>>
>> Say if job r1 consumed the data at t4 had maximum write_id 100.
>> Will rows updated by job w1 (end_time: t5) always have write_id > 100?
>>
>> Basically I need some kind of checkpoint using which the next run of the
>> read job can read only the data updated since the checkpoint.
>>
>> Thanks,
>> -Bhargav
>>
>>
>>
>>
>>

Re: Consuming delta from Hive tables

Posted by Alan Gates <al...@gmail.com>.
Sorry, looks like you sent this earlier and I missed it.

A couple of things.  One, write_id is per transaction per table.  So for
table T, all rows written in w1 will have the same write_id, though they
will each have their own monotonically increasing row_ids.  Row_ids are
scoped by a write_id, so if both w1 and w2 insert a 100 rows, w1 would have
write_id 1, and row_ids 0-99 while w2's rows would have write_id 2 and
row_ids 0-99.

Two, If w1 and w2 both attempted to update or delete (not insert) records
from the same partition of table T, then w1 would fail at commit time
because it would see that w2 had already committed and there's a possible
conflict.  This avoids lost updates and deleted records magically
reappearing.

Alan.

On Fri, May 17, 2019 at 4:44 AM Bhargav Bipinchandra Naik (Seller
Platform-BLR) <bh...@flipkart.com> wrote:

> Is the following scenario supported?
>
> *timestamp:* t1 < t2 < t3 < t4 < t5 < t6
>
> *w1 -* transaction which updates subset of rows in table T {start_time:
> t1, end_time: t5}
> *w2 -* transaction which updates subset of rows in table T {start_time:
> t2, end_time: t3}
> *r1 - *job which reads rows from table T {start_time: t4}
> *r2 - *job which reads rows from table T {start_time: t6}
>
> - Is the write_id strictly increasing number across rows?
> - Is the write_id a version number per row and not a global construct?
> - Will the subset of rows updated by w1 have write_ids greater than
> write_ids of row updated by w2?
>
> Say if job r1 consumed the data at t4 had maximum write_id 100.
> Will rows updated by job w1 (end_time: t5) always have write_id > 100?
>
> Basically I need some kind of checkpoint using which the next run of the
> read job can read only the data updated since the checkpoint.
>
> Thanks,
> -Bhargav
>
>
>
>
>

Re: Consuming delta from Hive tables

Posted by "Bhargav Bipinchandra Naik (Seller Platform-BLR)" <bh...@flipkart.com>.
Is the following scenario supported?

*timestamp:* t1 < t2 < t3 < t4 < t5 < t6

*w1 -* transaction which updates subset of rows in table T {start_time: t1,
end_time: t5}
*w2 -* transaction which updates subset of rows in table T {start_time: t2,
end_time: t3}
*r1 - *job which reads rows from table T {start_time: t4}
*r2 - *job which reads rows from table T {start_time: t6}

- Is the write_id strictly increasing number across rows?
- Is the write_id a version number per row and not a global construct?
- Will the subset of rows updated by w1 have write_ids greater than
write_ids of row updated by w2?

Say if job r1 consumed the data at t4 had maximum write_id 100.
Will rows updated by job w1 (end_time: t5) always have write_id > 100?

Basically I need some kind of checkpoint using which the next run of the
read job can read only the data updated since the checkpoint.

Thanks,
-Bhargav



On Tue, May 7, 2019 at 6:02 PM Bhargav Bipinchandra Naik (Seller
Platform-BLR) <bh...@flipkart.com> wrote:

> Hi Jesus and Alan,
>
> Thanks for the prompt reply.
> Had a follow up question.
>
> *timestamp:* t1 < t2 < t3 < t4 < t5 < t6
>
> *w1 -* transaction which updates subset of rows in table T {start_time:
> t1, end_time: t5}
> *w2 -* transaction which updates subset of rows in table T {start_time:
> t2, end_time: t3}
> *r1 - *job which reads rows from table T {start_time: t4}
> *r2 - *job which reads rows from table T {start_time: t6}
>
> - Is the write_id strictly increasing number across rows?
> - Is the write_id a version number per row and not a global construct?
> - Will the subset of rows updated by c1 have write_ids greater than
> write_ids of row updated by c2?
>
> Say if job r1 consumed the data at t4 had maximum write_id 100.
> Will rows updated by job w1 (end_time: t5) always have write_id > 100?
>
> Basically I need some kind of checkpoint using which the next run of the
> read job can read only the data updated since the checkpoint.
>
> Thanks,
> Bhargav
>
> On Mon, May 6, 2019 at 11:39 PM Alan Gates <al...@gmail.com> wrote:
>
>> The other issue is an external system has no ability to control when the
>> compactor is run (it rewrites deltas into the base files and thus erases
>> intermediate states that would interest you).  The mapping of writeids
>> (table specific) to transaction ids (system wide) is also cleaned
>> intermittently, again erasing history.  And there's no way to get the
>> mapping from writeids to transaction ids from outside of Hive.
>>
>> Alan.
>>
>> On Mon, May 6, 2019 at 6:23 AM Bhargav Bipinchandra Naik (Seller
>> Platform-BLR) <bh...@flipkart.com> wrote:
>>
>>> We have a scenario where we want to consume only delta updates from Hive
>>> tables.
>>> - Multiple producers are updating data in Hive table
>>> - Multiple consumer reading data from the Hive table
>>>
>>> Consumption pattern:
>>> - Get all data that has been updated since last time I read.
>>>
>>> Is there any mechanism in Hive 3.0 which can enable above consumption
>>> pattern?
>>>
>>> I see there is a construct of row__id(writeid, bucketid, rowid) in ACID
>>> tables.
>>> - Can row__id be used in this scenario?
>>> - How is the "writeid" generated?
>>> - Is there some meta information which captures the time when the rows
>>> were actually visible for read?
>>>
>>

Re: Consuming delta from Hive tables

Posted by "Bhargav Bipinchandra Naik (Seller Platform-BLR)" <bh...@flipkart.com>.
Hi Jesus and Alan,

Thanks for the prompt reply.
Had a follow up question.

*timestamp:* t1 < t2 < t3 < t4 < t5 < t6

*w1 -* transaction which updates subset of rows in table T {start_time: t1,
end_time: t5}
*w2 -* transaction which updates subset of rows in table T {start_time: t2,
end_time: t3}
*r1 - *job which reads rows from table T {start_time: t4}
*r2 - *job which reads rows from table T {start_time: t6}

- Is the write_id strictly increasing number across rows?
- Is the write_id a version number per row and not a global construct?
- Will the subset of rows updated by c1 have write_ids greater than
write_ids of row updated by c2?

Say if job r1 consumed the data at t4 had maximum write_id 100.
Will rows updated by job w1 (end_time: t5) always have write_id > 100?

Basically I need some kind of checkpoint using which the next run of the
read job can read only the data updated since the checkpoint.

Thanks,
Bhargav

On Mon, May 6, 2019 at 11:39 PM Alan Gates <al...@gmail.com> wrote:

> The other issue is an external system has no ability to control when the
> compactor is run (it rewrites deltas into the base files and thus erases
> intermediate states that would interest you).  The mapping of writeids
> (table specific) to transaction ids (system wide) is also cleaned
> intermittently, again erasing history.  And there's no way to get the
> mapping from writeids to transaction ids from outside of Hive.
>
> Alan.
>
> On Mon, May 6, 2019 at 6:23 AM Bhargav Bipinchandra Naik (Seller
> Platform-BLR) <bh...@flipkart.com> wrote:
>
>> We have a scenario where we want to consume only delta updates from Hive
>> tables.
>> - Multiple producers are updating data in Hive table
>> - Multiple consumer reading data from the Hive table
>>
>> Consumption pattern:
>> - Get all data that has been updated since last time I read.
>>
>> Is there any mechanism in Hive 3.0 which can enable above consumption
>> pattern?
>>
>> I see there is a construct of row__id(writeid, bucketid, rowid) in ACID
>> tables.
>> - Can row__id be used in this scenario?
>> - How is the "writeid" generated?
>> - Is there some meta information which captures the time when the rows
>> were actually visible for read?
>>
>

Re: Consuming delta from Hive tables

Posted by Alan Gates <al...@gmail.com>.
The other issue is an external system has no ability to control when the
compactor is run (it rewrites deltas into the base files and thus erases
intermediate states that would interest you).  The mapping of writeids
(table specific) to transaction ids (system wide) is also cleaned
intermittently, again erasing history.  And there's no way to get the
mapping from writeids to transaction ids from outside of Hive.

Alan.

On Mon, May 6, 2019 at 6:23 AM Bhargav Bipinchandra Naik (Seller
Platform-BLR) <bh...@flipkart.com> wrote:

> We have a scenario where we want to consume only delta updates from Hive
> tables.
> - Multiple producers are updating data in Hive table
> - Multiple consumer reading data from the Hive table
>
> Consumption pattern:
> - Get all data that has been updated since last time I read.
>
> Is there any mechanism in Hive 3.0 which can enable above consumption
> pattern?
>
> I see there is a construct of row__id(writeid, bucketid, rowid) in ACID
> tables.
> - Can row__id be used in this scenario?
> - How is the "writeid" generated?
> - Is there some meta information which captures the time when the rows
> were actually visible for read?
>