You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Danny Chan <da...@apache.org> on 2021/03/31 10:24:29 UTC

[DISCUSS] Incremental computation pipeline for HUDI

Hi dear HUDI community ~ Here i want to fire a discuss about using HUDI as
the unified storage/format for data warehouse/lake incremental computation.

Usually people divide data warehouse production into several levels, such
as the ODS(operation data store), DWD(data warehouse details), DWS(data
warehouse service), ADS(application data service).


ODS -> DWD -> DWS -> ADS

In the NEAR-REAL-TIME (or pure realtime) computation cases, a big topic is
syncing the change log(CDC pattern) from all kinds of RDBMS into the
warehouse/lake, the cdc patten records and propagate the change flag:
insert, update(before and after) and delete for the consumer, with these
flags, the downstream engines can have a realtime accumulation computation.

Using streaming engine like Flink, we can have a totally NEAR-REAL-TIME
computation pipeline for each of the layer.

If HUDI can keep and propagate these change flags to its consumers, we can
use HUDI as the unified format for the pipeline.

I'm expecting your nice ideas here ~

Best,
Danny Chan

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Vinoth Chandar <vi...@apache.org>.
Hi Danny,

Thanks, I will review this asap. Already, in the "review in progress"
column.

Thanks
Vinoth

On Thu, Apr 22, 2021 at 12:49 AM Danny Chan <da...@apache.org> wrote:

> > Should we throw together a PoC/test code for an example Flink pipeline
> that
> will use hudi cdc flags + state ful operators?
>
> I have updated the pr https://github.com/apache/hudi/pull/2854,
>
> see the test case HoodieDataSourceITCase#testStreamReadWithDeletes.
>
> A data source:
>
> change_flag | uuid | name | age | ts | partition
>
> I, id1, Danny, 23, 1970-01-01T00:00:01, par1
> I, id2, Stephen, 33, 1970-01-01T00:00:02, par1
> I, id3, Julian, 53, 1970-01-01T00:00:03, par2
> I, id4, Fabian, 31, 1970-01-01T00:00:04, par2
> I, id5, Sophia, 18, 1970-01-01T00:00:05, par3
> I, id6, Emma, 20, 1970-01-01T00:00:06, par3
> I, id7, Bob, 44, 1970-01-01T00:00:07, par4
> I, id8, Han, 56, 1970-01-01T00:00:08, par4
> U, id1, Danny, 24, 1970-01-01T00:00:01, par1
> U, id2, Stephen, 34, 1970-01-01T00:00:02, par1
> I, id3, Julian, 53, 1970-01-01T00:00:03, par2
> D, id5, Sophia, 18, 1970-01-01T00:00:05, par3
> D, id9, Jane, 19, 1970-01-01T00:00:06, par3
>
> with streaming query "select name, sum(age) from t1 group by name" returns:
>
> change_flag | name | age_sum
> I, Danny, 24
> I Stephen, 34
>
> The result is the same as a batch snapshot query.
>
> Best,
> Danny Chan
>
> Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 下午1:32写道:
>
> > Keeping compatibility is a must. i.e users should be able to upgrade to
> the
> > new release with the _hoodie_cdc_flag meta column,
> > and be able to query new data (with this new meta col) alongside old data
> > (without this new meta col).
> > In fact, they should be able to downgrade back to previous versions (say
> > there is some other snag they hit), and go back to not writing this new
> > meta column.
> > if this is too hard, then a config to control is not a bad idea at-least
> > for an initial release?
> >
> > Thanks for clarifying the use-case! Makes total sense to me and look
> > forward to getting this going.
> > Should we throw together a PoC/test code for an example Flink pipeline
> that
> > will use hudi cdc flags + state ful operators?
> > It ll help us iron out gaps iteratively, finalize requirements - instead
> of
> > a more top-down, waterfall like model?
> >
> > On Tue, Apr 20, 2021 at 8:25 PM Danny Chan <da...@apache.org> wrote:
> >
> > > > Is it providing the ability to author continuous queries on
> > > Hudi source tables end-end,
> > > given Flink can use the flags to generate retract/upsert streams
> > >
> > > Yes,that's the key point, with these flags plus flink stateful
> operators,
> > > we can have a real time incremental ETL pipeline.
> > >
> > > For example, a global aggregation that consumes cdc stream can do
> > > acc/retract continuously and send the changes to downstream.
> > > The ETL pipeline with cdc stream generates the same result as the batch
> > > snapshot with the same sql query.
> > >
> > > If keeping compatibility is a must with/without the new metadata
> > columns, I
> > > think there is no need to add a config option which brings in
> > > unnecessary overhead. If we do not ensure backward compatibility for
> new
> > > column, then we should add such a config option and by default
> > > disable it.
> > >
> > > Best,
> > > Danny Chan
> > >
> > >
> > > Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 上午6:30写道:
> > >
> > > > Hi Danny,
> > > >
> > > > Read up on the Flink docs as well.
> > > >
> > > > If we don't actually publish data to the metacolumn, I think the
> > overhead
> > > > is pretty low w.r.t avro/parquet. Both are very good at encoding
> nulls.
> > > > But, I feel it's worth adding a HoodieWriteConfig to control this and
> > > since
> > > > addition of meta columns mostly happens in the handles,
> > > > it may not be as bad ? Happy to suggest more concrete ideas on the
> PR.
> > > >
> > > > We still need to test backwards compatibility from different engines
> > > quite
> > > > early though and make sure there are no surprises.
> > > > Hive, Parquet, Spark, Presto all have their own rules on evolution as
> > > well.
> > > > So we need to think this through if/how seamlessly this can be turned
> > on
> > > > for existing tables
> > > >
> > > > As for testing the new column, given Flink is what will be able to
> > > consume
> > > > the flags? Can we write a quick unit test using Dynamic tables?
> > > > I am also curious to understand how the flags help the end user
> > > ultimately?
> > > > Reading the flink docs, I understand the concepts (coming from a
> Kafka
> > > > streams world,
> > > > most of it seem familiar), but what exact problem does the flag solve
> > > that
> > > > exist today? Is it providing the ability to author continuous queries
> > on
> > > > Hudi source tables end-end,
> > > > given Flink can use the flags to generate retract/upsert streams?
> > > >
> > > > For hard deletes, we still need to do some core work to make it
> > available
> > > > in the incremental query. So there's more to be done here for
> cracking
> > > this
> > > > end-end streaming/continuous ETL vision?
> > > >
> > > > Very exciting stuff!
> > > >
> > > > Thanks
> > > > Vinoth
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <da...@apache.org>
> > wrote:
> > > >
> > > > > Hi, i have created a PR here:
> > > > > https://github.com/apache/hudi/pull/2854/files
> > > > >
> > > > > In the PR i do these changes:
> > > > > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a
> > > config
> > > > > option because i can not find a good way to make the code clean, a
> > > > metadata
> > > > > column is very primitive and a config option would introduce too
> many
> > > > > changes
> > > > > 2. Modify the write handle to add the column: add operation for
> > append
> > > > > handle but merges the changes for create handle and merge handle
> > > > > 3. the flag is only useful for streaming read, so i also merge the
> > > flags
> > > > > for Flink batch reader, Flink streaming reader would emit each
> record
> > > > with
> > > > > the right cdc operation
> > > > >
> > > > > I did not change any Spark code because i'm not familiar with that,
> > > Spark
> > > > > actually can not handle these flags in operators, So by default,
> the
> > > > > column "_hoodie_cdc_operation", it has a value from the Flink
> writer.
> > > > >
> > > > > There may also need some unit tests for the new column in the
> hoodie
> > > > core,
> > > > > but i don't know how to, could you give some help ?
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > >
> > > > > Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:
> > > > >
> > > > > > Thanks @Sivabalan ~
> > > > > >
> > > > > > I agree that parquet and log files should keep sync in metadata
> > > columns
> > > > > in
> > > > > > case there are confusions
> > > > > > and special handling in some use cases like compaction.
> > > > > >
> > > > > > I also agree add a metadata column is more ease to use for SQL
> > > > > connectors.
> > > > > >
> > > > > > We can add a metadata column named "_hoodie_change_flag" and a
> > config
> > > > > > option to default disable this metadata column, what do you
> think?
> > > > > >
> > > > > > Best,
> > > > > > Danny Chan
> > > > > >
> > > > > >
> > > > > >
> > > > > > Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > > > > >
> > > > > >> wrt changes if we plan to add this only to log files, compaction
> > > needs
> > > > > to
> > > > > >> be fixed to omit this column to the minimum.
> > > > > >>
> > > > > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> > Just got a chance to read about dynamic tables. sounds
> > > interesting.
> > > > > >> >
> > > > > >> > some thoughts on your questions:
> > > > > >> > - yes, just MOR makes sense.
> > > > > >> > - But adding this new meta column only to avro logs might
> incur
> > > some
> > > > > non
> > > > > >> > trivial changes. Since as of today, schema of avro and base
> > files
> > > > are
> > > > > in
> > > > > >> > sync. If this new col is going to store just 2
> > > > > >> bits(insert/update/delete),
> > > > > >> > we might as well add it to base files as well and keep it
> > simple.
> > > > But
> > > > > we
> > > > > >> > can make it configurable so that only those interested can
> > enable
> > > > this
> > > > > >> new
> > > > > >> > column to hudi dataset.
> > > > > >> > - Wondering, even today we can achieve this by using a
> > > transformer,
> > > > to
> > > > > >> set
> > > > > >> > right values to this new column. I mean, users need to add
> this
> > > col
> > > > in
> > > > > >> > their schema when defining the hudi dataset and if the
> incoming
> > > data
> > > > > has
> > > > > >> > right values for this col (using deltastreamer's transformer
> or
> > by
> > > > > >> explicit
> > > > > >> > means), we don't even need to add this as a meta column. Just
> > > saying
> > > > > >> that
> > > > > >> > we can achieve this even today. But if we are looking to
> > integrate
> > > > w/
> > > > > >> SQL
> > > > > >> > DML, then adding this support would be elegant.
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <
> > danny0405@apache.org
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Thanks Vinoth ~
> > > > > >> >>
> > > > > >> >> Here is a document about the notion of 《Flink Dynamic
> > Table》[1] ,
> > > > > every
> > > > > >> >> operator that has accumulate state can handle
> > > > > >> retractions(UPDATE_BEFORE or
> > > > > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so
> > that
> > > > each
> > > > > >> >> operator can consume the CDC format messages in streaming
> way.
> > > > > >> >>
> > > > > >> >> > Another aspect to think about is, how the new flag can be
> > added
> > > > to
> > > > > >> >> existing
> > > > > >> >> tables and if the schema evolution would be fine.
> > > > > >> >>
> > > > > >> >> That is also my concern, but it's not that bad because
> adding a
> > > new
> > > > > >> column
> > > > > >> >> is still compatible for old schema in Avro.
> > > > > >> >>
> > > > > >> >> [1]
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > > > > >> >>
> > > > > >> >> Best,
> > > > > >> >> Danny Chan
> > > > > >> >>
> > > > > >> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > > > > >> >>
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Is the intent of the flag to convey if an insert delete or
> > > update
> > > > > >> >> changed
> > > > > >> >> > the record? If so I would imagine that we do this even for
> > cow
> > > > > >> tables,
> > > > > >> >> > since that also supports a logical notion of a change
> stream
> > > > using
> > > > > >> the
> > > > > >> >> > commit_time meta field.
> > > > > >> >> >
> > > > > >> >> > You may be right, but I am trying to understand the use
> case
> > > for
> > > > > >> this.
> > > > > >> >> Any
> > > > > >> >> > links/flink docs I can read?
> > > > > >> >> >
> > > > > >> >> > Another aspect to think about is, how the new flag can be
> > added
> > > > to
> > > > > >> >> existing
> > > > > >> >> > tables and if the schema evolution would be fine.
> > > > > >> >> >
> > > > > >> >> > Thanks
> > > > > >> >> > Vinoth
> > > > > >> >> >
> > > > > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <
> > > danny0405@apache.org>
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> > > I tries to do a POC for flink locally and it works well,
> in
> > > the
> > > > > PR
> > > > > >> i
> > > > > >> >> add
> > > > > >> >> > a
> > > > > >> >> > > new metadata column named "_hoodie_change_flag", but
> > > actually i
> > > > > >> found
> > > > > >> >> > that
> > > > > >> >> > > only log format needs this flag, and the Spark may has no
> > > > ability
> > > > > >> to
> > > > > >> >> > handle
> > > > > >> >> > > the flag for incremental processing yet.
> > > > > >> >> > >
> > > > > >> >> > > So should i add the "_hoodie_change_flag" metadata
> column,
> > or
> > > > is
> > > > > >> there
> > > > > >> >> > any
> > > > > >> >> > > better solution for this?
> > > > > >> >> > >
> > > > > >> >> > > Best,
> > > > > >> >> > > Danny Chan
> > > > > >> >> > >
> > > > > >> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五
> 上午11:08写道:
> > > > > >> >> > >
> > > > > >> >> > > > Thanks cool, then the left questions are:
> > > > > >> >> > > >
> > > > > >> >> > > > - where we record these change, should we add a builtin
> > > meta
> > > > > >> field
> > > > > >> >> such
> > > > > >> >> > > as
> > > > > >> >> > > > the _change_flag_ like the other system columns for e.g
> > > > > >> >> > > _hoodie_commit_time
> > > > > >> >> > > > - what kind of table should keep these flags, in my
> > > thoughts,
> > > > > we
> > > > > >> >> should
> > > > > >> >> > > > only add these flags for "MERGE_ON_READ" table, and
> only
> > > for
> > > > > AVRO
> > > > > >> >> logs
> > > > > >> >> > > > - we should add a config there to switch on/off the
> flags
> > > in
> > > > > >> system
> > > > > >> >> > meta
> > > > > >> >> > > > fields
> > > > > >> >> > > >
> > > > > >> >> > > > What do you think?
> > > > > >> >> > > >
> > > > > >> >> > > > Best,
> > > > > >> >> > > > Danny Chan
> > > > > >> >> > > >
> > > > > >> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四
> > 上午10:58写道:
> > > > > >> >> > > >
> > > > > >> >> > > >> >> Oops, the image crushes, for "change flags", i
> mean:
> > > > > insert,
> > > > > >> >> > > >> update(before
> > > > > >> >> > > >> and after) and delete.
> > > > > >> >> > > >>
> > > > > >> >> > > >> Yes, the image I attached is also about these flags.
> > > > > >> >> > > >> [image: image (3).png]
> > > > > >> >> > > >>
> > > > > >> >> > > >> +1 for the idea.
> > > > > >> >> > > >>
> > > > > >> >> > > >> Best,
> > > > > >> >> > > >> Vino
> > > > > >> >> > > >>
> > > > > >> >> > > >>
> > > > > >> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四
> > 上午10:03写道:
> > > > > >> >> > > >>
> > > > > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> > > > insert,
> > > > > >> >> > > >>> update(before
> > > > > >> >> > > >>> and after) and delete.
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> The Flink engine can propagate the change flags
> > > internally
> > > > > >> between
> > > > > >> >> > its
> > > > > >> >> > > >>> operators, if HUDI can send the change flags to
> Flink,
> > > the
> > > > > >> >> > incremental
> > > > > >> >> > > >>> calculation of CDC would be very natural (almost
> > > > transparent
> > > > > to
> > > > > >> >> > users).
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> Best,
> > > > > >> >> > > >>> Danny Chan
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三
> > > 下午11:32写道:
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> > Hi Danny,
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Yes, incremental query( or says "incremental
> > > processing")
> > > > > has
> > > > > >> >> > always
> > > > > >> >> > > >>> been
> > > > > >> >> > > >>> > an important feature of the Hudi framework. If we
> can
> > > > make
> > > > > >> this
> > > > > >> >> > > feature
> > > > > >> >> > > >>> > better, it will be even more exciting.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > In the data warehouse, in some complex
> calculations,
> > I
> > > > have
> > > > > >> not
> > > > > >> >> > > found a
> > > > > >> >> > > >>> > good way to conveniently use some incremental
> change
> > > data
> > > > > >> >> (similar
> > > > > >> >> > to
> > > > > >> >> > > >>> the
> > > > > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > > > > "correct"
> > > > > >> >> the
> > > > > >> >> > > >>> > aggregation result (these aggregation results may
> > > belong
> > > > to
> > > > > >> the
> > > > > >> >> DWS
> > > > > >> >> > > >>> layer).
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> > > > scenarios
> > > > > >> >> (single
> > > > > >> >> > > >>> table
> > > > > >> >> > > >>> > or an algorithm that can be very easily
> retracement)
> > > can
> > > > be
> > > > > >> >> dealt
> > > > > >> >> > > with
> > > > > >> >> > > >>> > based on the incremental calculation of CDC.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Of course, the expression of incremental
> calculation
> > on
> > > > > >> various
> > > > > >> >> > > >>> occasions
> > > > > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss
> it
> > > > more
> > > > > >> >> clearly
> > > > > >> >> > in
> > > > > >> >> > > >>> > specific scenarios.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > >> If HUDI can keep and propagate these change
> flags
> > to
> > > > its
> > > > > >> >> > > consumers,
> > > > > >> >> > > >>> we
> > > > > >> >> > > >>> > can
> > > > > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Regarding the "change flags" here, do you mean the
> > > flags
> > > > > like
> > > > > >> >> the
> > > > > >> >> > one
> > > > > >> >> > > >>> > shown in the figure below?
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > [image: image.png]
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Best,
> > > > > >> >> > > >>> > Vino
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三
> > > > 下午6:24写道:
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a
> > discuss
> > > > > about
> > > > > >> >> using
> > > > > >> >> > > >>> HUDI as
> > > > > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > > > > >> incremental
> > > > > >> >> > > >>> >> computation.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Usually people divide data warehouse production
> into
> > > > > several
> > > > > >> >> > levels,
> > > > > >> >> > > >>> such
> > > > > >> >> > > >>> >> as the ODS(operation data store), DWD(data
> warehouse
> > > > > >> details),
> > > > > >> >> > > >>> DWS(data
> > > > > >> >> > > >>> >> warehouse service), ADS(application data service).
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime)
> computation
> > > > > cases,
> > > > > >> a
> > > > > >> >> big
> > > > > >> >> > > >>> topic is
> > > > > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds
> > of
> > > > > RDBMS
> > > > > >> >> into
> > > > > >> >> > the
> > > > > >> >> > > >>> >> warehouse/lake, the cdc patten records and
> propagate
> > > the
> > > > > >> change
> > > > > >> >> > > flag:
> > > > > >> >> > > >>> >> insert, update(before and after) and delete for
> the
> > > > > >> consumer,
> > > > > >> >> with
> > > > > >> >> > > >>> these
> > > > > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > > > > >> accumulation
> > > > > >> >> > > >>> >> computation.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Using streaming engine like Flink, we can have a
> > > totally
> > > > > >> >> > > >>> NEAR-REAL-TIME
> > > > > >> >> > > >>> >> computation pipeline for each of the layer.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> If HUDI can keep and propagate these change flags
> to
> > > its
> > > > > >> >> > consumers,
> > > > > >> >> > > >>> we can
> > > > > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Best,
> > > > > >> >> > > >>> >> Danny Chan
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>>
> > > > > >> >> > > >>
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > Regards,
> > > > > >> > -Sivabalan
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Regards,
> > > > > >> -Sivabalan
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
> Should we throw together a PoC/test code for an example Flink pipeline
that
will use hudi cdc flags + state ful operators?

I have updated the pr https://github.com/apache/hudi/pull/2854,

see the test case HoodieDataSourceITCase#testStreamReadWithDeletes.

A data source:

change_flag | uuid | name | age | ts | partition

I, id1, Danny, 23, 1970-01-01T00:00:01, par1
I, id2, Stephen, 33, 1970-01-01T00:00:02, par1
I, id3, Julian, 53, 1970-01-01T00:00:03, par2
I, id4, Fabian, 31, 1970-01-01T00:00:04, par2
I, id5, Sophia, 18, 1970-01-01T00:00:05, par3
I, id6, Emma, 20, 1970-01-01T00:00:06, par3
I, id7, Bob, 44, 1970-01-01T00:00:07, par4
I, id8, Han, 56, 1970-01-01T00:00:08, par4
U, id1, Danny, 24, 1970-01-01T00:00:01, par1
U, id2, Stephen, 34, 1970-01-01T00:00:02, par1
I, id3, Julian, 53, 1970-01-01T00:00:03, par2
D, id5, Sophia, 18, 1970-01-01T00:00:05, par3
D, id9, Jane, 19, 1970-01-01T00:00:06, par3

with streaming query "select name, sum(age) from t1 group by name" returns:

change_flag | name | age_sum
I, Danny, 24
I Stephen, 34

The result is the same as a batch snapshot query.

Best,
Danny Chan

Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 下午1:32写道:

> Keeping compatibility is a must. i.e users should be able to upgrade to the
> new release with the _hoodie_cdc_flag meta column,
> and be able to query new data (with this new meta col) alongside old data
> (without this new meta col).
> In fact, they should be able to downgrade back to previous versions (say
> there is some other snag they hit), and go back to not writing this new
> meta column.
> if this is too hard, then a config to control is not a bad idea at-least
> for an initial release?
>
> Thanks for clarifying the use-case! Makes total sense to me and look
> forward to getting this going.
> Should we throw together a PoC/test code for an example Flink pipeline that
> will use hudi cdc flags + state ful operators?
> It ll help us iron out gaps iteratively, finalize requirements - instead of
> a more top-down, waterfall like model?
>
> On Tue, Apr 20, 2021 at 8:25 PM Danny Chan <da...@apache.org> wrote:
>
> > > Is it providing the ability to author continuous queries on
> > Hudi source tables end-end,
> > given Flink can use the flags to generate retract/upsert streams
> >
> > Yes,that's the key point, with these flags plus flink stateful operators,
> > we can have a real time incremental ETL pipeline.
> >
> > For example, a global aggregation that consumes cdc stream can do
> > acc/retract continuously and send the changes to downstream.
> > The ETL pipeline with cdc stream generates the same result as the batch
> > snapshot with the same sql query.
> >
> > If keeping compatibility is a must with/without the new metadata
> columns, I
> > think there is no need to add a config option which brings in
> > unnecessary overhead. If we do not ensure backward compatibility for new
> > column, then we should add such a config option and by default
> > disable it.
> >
> > Best,
> > Danny Chan
> >
> >
> > Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 上午6:30写道:
> >
> > > Hi Danny,
> > >
> > > Read up on the Flink docs as well.
> > >
> > > If we don't actually publish data to the metacolumn, I think the
> overhead
> > > is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
> > > But, I feel it's worth adding a HoodieWriteConfig to control this and
> > since
> > > addition of meta columns mostly happens in the handles,
> > > it may not be as bad ? Happy to suggest more concrete ideas on the PR.
> > >
> > > We still need to test backwards compatibility from different engines
> > quite
> > > early though and make sure there are no surprises.
> > > Hive, Parquet, Spark, Presto all have their own rules on evolution as
> > well.
> > > So we need to think this through if/how seamlessly this can be turned
> on
> > > for existing tables
> > >
> > > As for testing the new column, given Flink is what will be able to
> > consume
> > > the flags? Can we write a quick unit test using Dynamic tables?
> > > I am also curious to understand how the flags help the end user
> > ultimately?
> > > Reading the flink docs, I understand the concepts (coming from a Kafka
> > > streams world,
> > > most of it seem familiar), but what exact problem does the flag solve
> > that
> > > exist today? Is it providing the ability to author continuous queries
> on
> > > Hudi source tables end-end,
> > > given Flink can use the flags to generate retract/upsert streams?
> > >
> > > For hard deletes, we still need to do some core work to make it
> available
> > > in the incremental query. So there's more to be done here for cracking
> > this
> > > end-end streaming/continuous ETL vision?
> > >
> > > Very exciting stuff!
> > >
> > > Thanks
> > > Vinoth
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <da...@apache.org>
> wrote:
> > >
> > > > Hi, i have created a PR here:
> > > > https://github.com/apache/hudi/pull/2854/files
> > > >
> > > > In the PR i do these changes:
> > > > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a
> > config
> > > > option because i can not find a good way to make the code clean, a
> > > metadata
> > > > column is very primitive and a config option would introduce too many
> > > > changes
> > > > 2. Modify the write handle to add the column: add operation for
> append
> > > > handle but merges the changes for create handle and merge handle
> > > > 3. the flag is only useful for streaming read, so i also merge the
> > flags
> > > > for Flink batch reader, Flink streaming reader would emit each record
> > > with
> > > > the right cdc operation
> > > >
> > > > I did not change any Spark code because i'm not familiar with that,
> > Spark
> > > > actually can not handle these flags in operators, So by default, the
> > > > column "_hoodie_cdc_operation", it has a value from the Flink writer.
> > > >
> > > > There may also need some unit tests for the new column in the hoodie
> > > core,
> > > > but i don't know how to, could you give some help ?
> > > >
> > > > Best,
> > > > Danny Chan
> > > >
> > > > Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:
> > > >
> > > > > Thanks @Sivabalan ~
> > > > >
> > > > > I agree that parquet and log files should keep sync in metadata
> > columns
> > > > in
> > > > > case there are confusions
> > > > > and special handling in some use cases like compaction.
> > > > >
> > > > > I also agree add a metadata column is more ease to use for SQL
> > > > connectors.
> > > > >
> > > > > We can add a metadata column named "_hoodie_change_flag" and a
> config
> > > > > option to default disable this metadata column, what do you think?
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > >
> > > > >
> > > > >
> > > > > Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > > > >
> > > > >> wrt changes if we plan to add this only to log files, compaction
> > needs
> > > > to
> > > > >> be fixed to omit this column to the minimum.
> > > > >>
> > > > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com>
> > wrote:
> > > > >>
> > > > >> > Just got a chance to read about dynamic tables. sounds
> > interesting.
> > > > >> >
> > > > >> > some thoughts on your questions:
> > > > >> > - yes, just MOR makes sense.
> > > > >> > - But adding this new meta column only to avro logs might incur
> > some
> > > > non
> > > > >> > trivial changes. Since as of today, schema of avro and base
> files
> > > are
> > > > in
> > > > >> > sync. If this new col is going to store just 2
> > > > >> bits(insert/update/delete),
> > > > >> > we might as well add it to base files as well and keep it
> simple.
> > > But
> > > > we
> > > > >> > can make it configurable so that only those interested can
> enable
> > > this
> > > > >> new
> > > > >> > column to hudi dataset.
> > > > >> > - Wondering, even today we can achieve this by using a
> > transformer,
> > > to
> > > > >> set
> > > > >> > right values to this new column. I mean, users need to add this
> > col
> > > in
> > > > >> > their schema when defining the hudi dataset and if the incoming
> > data
> > > > has
> > > > >> > right values for this col (using deltastreamer's transformer or
> by
> > > > >> explicit
> > > > >> > means), we don't even need to add this as a meta column. Just
> > saying
> > > > >> that
> > > > >> > we can achieve this even today. But if we are looking to
> integrate
> > > w/
> > > > >> SQL
> > > > >> > DML, then adding this support would be elegant.
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <
> danny0405@apache.org
> > >
> > > > >> wrote:
> > > > >> >
> > > > >> >> Thanks Vinoth ~
> > > > >> >>
> > > > >> >> Here is a document about the notion of 《Flink Dynamic
> Table》[1] ,
> > > > every
> > > > >> >> operator that has accumulate state can handle
> > > > >> retractions(UPDATE_BEFORE or
> > > > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so
> that
> > > each
> > > > >> >> operator can consume the CDC format messages in streaming way.
> > > > >> >>
> > > > >> >> > Another aspect to think about is, how the new flag can be
> added
> > > to
> > > > >> >> existing
> > > > >> >> tables and if the schema evolution would be fine.
> > > > >> >>
> > > > >> >> That is also my concern, but it's not that bad because adding a
> > new
> > > > >> column
> > > > >> >> is still compatible for old schema in Avro.
> > > > >> >>
> > > > >> >> [1]
> > > > >> >>
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > > > >> >>
> > > > >> >> Best,
> > > > >> >> Danny Chan
> > > > >> >>
> > > > >> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > > > >> >>
> > > > >> >> > Hi,
> > > > >> >> >
> > > > >> >> > Is the intent of the flag to convey if an insert delete or
> > update
> > > > >> >> changed
> > > > >> >> > the record? If so I would imagine that we do this even for
> cow
> > > > >> tables,
> > > > >> >> > since that also supports a logical notion of a change stream
> > > using
> > > > >> the
> > > > >> >> > commit_time meta field.
> > > > >> >> >
> > > > >> >> > You may be right, but I am trying to understand the use case
> > for
> > > > >> this.
> > > > >> >> Any
> > > > >> >> > links/flink docs I can read?
> > > > >> >> >
> > > > >> >> > Another aspect to think about is, how the new flag can be
> added
> > > to
> > > > >> >> existing
> > > > >> >> > tables and if the schema evolution would be fine.
> > > > >> >> >
> > > > >> >> > Thanks
> > > > >> >> > Vinoth
> > > > >> >> >
> > > > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <
> > danny0405@apache.org>
> > > > >> wrote:
> > > > >> >> >
> > > > >> >> > > I tries to do a POC for flink locally and it works well, in
> > the
> > > > PR
> > > > >> i
> > > > >> >> add
> > > > >> >> > a
> > > > >> >> > > new metadata column named "_hoodie_change_flag", but
> > actually i
> > > > >> found
> > > > >> >> > that
> > > > >> >> > > only log format needs this flag, and the Spark may has no
> > > ability
> > > > >> to
> > > > >> >> > handle
> > > > >> >> > > the flag for incremental processing yet.
> > > > >> >> > >
> > > > >> >> > > So should i add the "_hoodie_change_flag" metadata column,
> or
> > > is
> > > > >> there
> > > > >> >> > any
> > > > >> >> > > better solution for this?
> > > > >> >> > >
> > > > >> >> > > Best,
> > > > >> >> > > Danny Chan
> > > > >> >> > >
> > > > >> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> > > > >> >> > >
> > > > >> >> > > > Thanks cool, then the left questions are:
> > > > >> >> > > >
> > > > >> >> > > > - where we record these change, should we add a builtin
> > meta
> > > > >> field
> > > > >> >> such
> > > > >> >> > > as
> > > > >> >> > > > the _change_flag_ like the other system columns for e.g
> > > > >> >> > > _hoodie_commit_time
> > > > >> >> > > > - what kind of table should keep these flags, in my
> > thoughts,
> > > > we
> > > > >> >> should
> > > > >> >> > > > only add these flags for "MERGE_ON_READ" table, and only
> > for
> > > > AVRO
> > > > >> >> logs
> > > > >> >> > > > - we should add a config there to switch on/off the flags
> > in
> > > > >> system
> > > > >> >> > meta
> > > > >> >> > > > fields
> > > > >> >> > > >
> > > > >> >> > > > What do you think?
> > > > >> >> > > >
> > > > >> >> > > > Best,
> > > > >> >> > > > Danny Chan
> > > > >> >> > > >
> > > > >> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四
> 上午10:58写道:
> > > > >> >> > > >
> > > > >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> > > > insert,
> > > > >> >> > > >> update(before
> > > > >> >> > > >> and after) and delete.
> > > > >> >> > > >>
> > > > >> >> > > >> Yes, the image I attached is also about these flags.
> > > > >> >> > > >> [image: image (3).png]
> > > > >> >> > > >>
> > > > >> >> > > >> +1 for the idea.
> > > > >> >> > > >>
> > > > >> >> > > >> Best,
> > > > >> >> > > >> Vino
> > > > >> >> > > >>
> > > > >> >> > > >>
> > > > >> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四
> 上午10:03写道:
> > > > >> >> > > >>
> > > > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> > > insert,
> > > > >> >> > > >>> update(before
> > > > >> >> > > >>> and after) and delete.
> > > > >> >> > > >>>
> > > > >> >> > > >>> The Flink engine can propagate the change flags
> > internally
> > > > >> between
> > > > >> >> > its
> > > > >> >> > > >>> operators, if HUDI can send the change flags to Flink,
> > the
> > > > >> >> > incremental
> > > > >> >> > > >>> calculation of CDC would be very natural (almost
> > > transparent
> > > > to
> > > > >> >> > users).
> > > > >> >> > > >>>
> > > > >> >> > > >>> Best,
> > > > >> >> > > >>> Danny Chan
> > > > >> >> > > >>>
> > > > >> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三
> > 下午11:32写道:
> > > > >> >> > > >>>
> > > > >> >> > > >>> > Hi Danny,
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Yes, incremental query( or says "incremental
> > processing")
> > > > has
> > > > >> >> > always
> > > > >> >> > > >>> been
> > > > >> >> > > >>> > an important feature of the Hudi framework. If we can
> > > make
> > > > >> this
> > > > >> >> > > feature
> > > > >> >> > > >>> > better, it will be even more exciting.
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > In the data warehouse, in some complex calculations,
> I
> > > have
> > > > >> not
> > > > >> >> > > found a
> > > > >> >> > > >>> > good way to conveniently use some incremental change
> > data
> > > > >> >> (similar
> > > > >> >> > to
> > > > >> >> > > >>> the
> > > > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > > > "correct"
> > > > >> >> the
> > > > >> >> > > >>> > aggregation result (these aggregation results may
> > belong
> > > to
> > > > >> the
> > > > >> >> DWS
> > > > >> >> > > >>> layer).
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> > > scenarios
> > > > >> >> (single
> > > > >> >> > > >>> table
> > > > >> >> > > >>> > or an algorithm that can be very easily retracement)
> > can
> > > be
> > > > >> >> dealt
> > > > >> >> > > with
> > > > >> >> > > >>> > based on the incremental calculation of CDC.
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Of course, the expression of incremental calculation
> on
> > > > >> various
> > > > >> >> > > >>> occasions
> > > > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it
> > > more
> > > > >> >> clearly
> > > > >> >> > in
> > > > >> >> > > >>> > specific scenarios.
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > >> If HUDI can keep and propagate these change flags
> to
> > > its
> > > > >> >> > > consumers,
> > > > >> >> > > >>> we
> > > > >> >> > > >>> > can
> > > > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Regarding the "change flags" here, do you mean the
> > flags
> > > > like
> > > > >> >> the
> > > > >> >> > one
> > > > >> >> > > >>> > shown in the figure below?
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > [image: image.png]
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Best,
> > > > >> >> > > >>> > Vino
> > > > >> >> > > >>> >
> > > > >> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三
> > > 下午6:24写道:
> > > > >> >> > > >>> >
> > > > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a
> discuss
> > > > about
> > > > >> >> using
> > > > >> >> > > >>> HUDI as
> > > > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > > > >> incremental
> > > > >> >> > > >>> >> computation.
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> Usually people divide data warehouse production into
> > > > several
> > > > >> >> > levels,
> > > > >> >> > > >>> such
> > > > >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> > > > >> details),
> > > > >> >> > > >>> DWS(data
> > > > >> >> > > >>> >> warehouse service), ADS(application data service).
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> > > > cases,
> > > > >> a
> > > > >> >> big
> > > > >> >> > > >>> topic is
> > > > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds
> of
> > > > RDBMS
> > > > >> >> into
> > > > >> >> > the
> > > > >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate
> > the
> > > > >> change
> > > > >> >> > > flag:
> > > > >> >> > > >>> >> insert, update(before and after) and delete for the
> > > > >> consumer,
> > > > >> >> with
> > > > >> >> > > >>> these
> > > > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > > > >> accumulation
> > > > >> >> > > >>> >> computation.
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> Using streaming engine like Flink, we can have a
> > totally
> > > > >> >> > > >>> NEAR-REAL-TIME
> > > > >> >> > > >>> >> computation pipeline for each of the layer.
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> If HUDI can keep and propagate these change flags to
> > its
> > > > >> >> > consumers,
> > > > >> >> > > >>> we can
> > > > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >> Best,
> > > > >> >> > > >>> >> Danny Chan
> > > > >> >> > > >>> >>
> > > > >> >> > > >>> >
> > > > >> >> > > >>>
> > > > >> >> > > >>
> > > > >> >> > >
> > > > >> >> >
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > Regards,
> > > > >> > -Sivabalan
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Regards,
> > > > >> -Sivabalan
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Vinoth Chandar <vi...@apache.org>.
Keeping compatibility is a must. i.e users should be able to upgrade to the
new release with the _hoodie_cdc_flag meta column,
and be able to query new data (with this new meta col) alongside old data
(without this new meta col).
In fact, they should be able to downgrade back to previous versions (say
there is some other snag they hit), and go back to not writing this new
meta column.
if this is too hard, then a config to control is not a bad idea at-least
for an initial release?

Thanks for clarifying the use-case! Makes total sense to me and look
forward to getting this going.
Should we throw together a PoC/test code for an example Flink pipeline that
will use hudi cdc flags + state ful operators?
It ll help us iron out gaps iteratively, finalize requirements - instead of
a more top-down, waterfall like model?

On Tue, Apr 20, 2021 at 8:25 PM Danny Chan <da...@apache.org> wrote:

> > Is it providing the ability to author continuous queries on
> Hudi source tables end-end,
> given Flink can use the flags to generate retract/upsert streams
>
> Yes,that's the key point, with these flags plus flink stateful operators,
> we can have a real time incremental ETL pipeline.
>
> For example, a global aggregation that consumes cdc stream can do
> acc/retract continuously and send the changes to downstream.
> The ETL pipeline with cdc stream generates the same result as the batch
> snapshot with the same sql query.
>
> If keeping compatibility is a must with/without the new metadata columns, I
> think there is no need to add a config option which brings in
> unnecessary overhead. If we do not ensure backward compatibility for new
> column, then we should add such a config option and by default
> disable it.
>
> Best,
> Danny Chan
>
>
> Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 上午6:30写道:
>
> > Hi Danny,
> >
> > Read up on the Flink docs as well.
> >
> > If we don't actually publish data to the metacolumn, I think the overhead
> > is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
> > But, I feel it's worth adding a HoodieWriteConfig to control this and
> since
> > addition of meta columns mostly happens in the handles,
> > it may not be as bad ? Happy to suggest more concrete ideas on the PR.
> >
> > We still need to test backwards compatibility from different engines
> quite
> > early though and make sure there are no surprises.
> > Hive, Parquet, Spark, Presto all have their own rules on evolution as
> well.
> > So we need to think this through if/how seamlessly this can be turned on
> > for existing tables
> >
> > As for testing the new column, given Flink is what will be able to
> consume
> > the flags? Can we write a quick unit test using Dynamic tables?
> > I am also curious to understand how the flags help the end user
> ultimately?
> > Reading the flink docs, I understand the concepts (coming from a Kafka
> > streams world,
> > most of it seem familiar), but what exact problem does the flag solve
> that
> > exist today? Is it providing the ability to author continuous queries on
> > Hudi source tables end-end,
> > given Flink can use the flags to generate retract/upsert streams?
> >
> > For hard deletes, we still need to do some core work to make it available
> > in the incremental query. So there's more to be done here for cracking
> this
> > end-end streaming/continuous ETL vision?
> >
> > Very exciting stuff!
> >
> > Thanks
> > Vinoth
> >
> >
> >
> >
> >
> > On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <da...@apache.org> wrote:
> >
> > > Hi, i have created a PR here:
> > > https://github.com/apache/hudi/pull/2854/files
> > >
> > > In the PR i do these changes:
> > > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a
> config
> > > option because i can not find a good way to make the code clean, a
> > metadata
> > > column is very primitive and a config option would introduce too many
> > > changes
> > > 2. Modify the write handle to add the column: add operation for append
> > > handle but merges the changes for create handle and merge handle
> > > 3. the flag is only useful for streaming read, so i also merge the
> flags
> > > for Flink batch reader, Flink streaming reader would emit each record
> > with
> > > the right cdc operation
> > >
> > > I did not change any Spark code because i'm not familiar with that,
> Spark
> > > actually can not handle these flags in operators, So by default, the
> > > column "_hoodie_cdc_operation", it has a value from the Flink writer.
> > >
> > > There may also need some unit tests for the new column in the hoodie
> > core,
> > > but i don't know how to, could you give some help ?
> > >
> > > Best,
> > > Danny Chan
> > >
> > > Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:
> > >
> > > > Thanks @Sivabalan ~
> > > >
> > > > I agree that parquet and log files should keep sync in metadata
> columns
> > > in
> > > > case there are confusions
> > > > and special handling in some use cases like compaction.
> > > >
> > > > I also agree add a metadata column is more ease to use for SQL
> > > connectors.
> > > >
> > > > We can add a metadata column named "_hoodie_change_flag" and a config
> > > > option to default disable this metadata column, what do you think?
> > > >
> > > > Best,
> > > > Danny Chan
> > > >
> > > >
> > > >
> > > > Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > > >
> > > >> wrt changes if we plan to add this only to log files, compaction
> needs
> > > to
> > > >> be fixed to omit this column to the minimum.
> > > >>
> > > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com>
> wrote:
> > > >>
> > > >> > Just got a chance to read about dynamic tables. sounds
> interesting.
> > > >> >
> > > >> > some thoughts on your questions:
> > > >> > - yes, just MOR makes sense.
> > > >> > - But adding this new meta column only to avro logs might incur
> some
> > > non
> > > >> > trivial changes. Since as of today, schema of avro and base files
> > are
> > > in
> > > >> > sync. If this new col is going to store just 2
> > > >> bits(insert/update/delete),
> > > >> > we might as well add it to base files as well and keep it simple.
> > But
> > > we
> > > >> > can make it configurable so that only those interested can enable
> > this
> > > >> new
> > > >> > column to hudi dataset.
> > > >> > - Wondering, even today we can achieve this by using a
> transformer,
> > to
> > > >> set
> > > >> > right values to this new column. I mean, users need to add this
> col
> > in
> > > >> > their schema when defining the hudi dataset and if the incoming
> data
> > > has
> > > >> > right values for this col (using deltastreamer's transformer or by
> > > >> explicit
> > > >> > means), we don't even need to add this as a meta column. Just
> saying
> > > >> that
> > > >> > we can achieve this even today. But if we are looking to integrate
> > w/
> > > >> SQL
> > > >> > DML, then adding this support would be elegant.
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <danny0405@apache.org
> >
> > > >> wrote:
> > > >> >
> > > >> >> Thanks Vinoth ~
> > > >> >>
> > > >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] ,
> > > every
> > > >> >> operator that has accumulate state can handle
> > > >> retractions(UPDATE_BEFORE or
> > > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that
> > each
> > > >> >> operator can consume the CDC format messages in streaming way.
> > > >> >>
> > > >> >> > Another aspect to think about is, how the new flag can be added
> > to
> > > >> >> existing
> > > >> >> tables and if the schema evolution would be fine.
> > > >> >>
> > > >> >> That is also my concern, but it's not that bad because adding a
> new
> > > >> column
> > > >> >> is still compatible for old schema in Avro.
> > > >> >>
> > > >> >> [1]
> > > >> >>
> > > >> >>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > > >> >>
> > > >> >> Best,
> > > >> >> Danny Chan
> > > >> >>
> > > >> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > > >> >>
> > > >> >> > Hi,
> > > >> >> >
> > > >> >> > Is the intent of the flag to convey if an insert delete or
> update
> > > >> >> changed
> > > >> >> > the record? If so I would imagine that we do this even for cow
> > > >> tables,
> > > >> >> > since that also supports a logical notion of a change stream
> > using
> > > >> the
> > > >> >> > commit_time meta field.
> > > >> >> >
> > > >> >> > You may be right, but I am trying to understand the use case
> for
> > > >> this.
> > > >> >> Any
> > > >> >> > links/flink docs I can read?
> > > >> >> >
> > > >> >> > Another aspect to think about is, how the new flag can be added
> > to
> > > >> >> existing
> > > >> >> > tables and if the schema evolution would be fine.
> > > >> >> >
> > > >> >> > Thanks
> > > >> >> > Vinoth
> > > >> >> >
> > > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <
> danny0405@apache.org>
> > > >> wrote:
> > > >> >> >
> > > >> >> > > I tries to do a POC for flink locally and it works well, in
> the
> > > PR
> > > >> i
> > > >> >> add
> > > >> >> > a
> > > >> >> > > new metadata column named "_hoodie_change_flag", but
> actually i
> > > >> found
> > > >> >> > that
> > > >> >> > > only log format needs this flag, and the Spark may has no
> > ability
> > > >> to
> > > >> >> > handle
> > > >> >> > > the flag for incremental processing yet.
> > > >> >> > >
> > > >> >> > > So should i add the "_hoodie_change_flag" metadata column, or
> > is
> > > >> there
> > > >> >> > any
> > > >> >> > > better solution for this?
> > > >> >> > >
> > > >> >> > > Best,
> > > >> >> > > Danny Chan
> > > >> >> > >
> > > >> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> > > >> >> > >
> > > >> >> > > > Thanks cool, then the left questions are:
> > > >> >> > > >
> > > >> >> > > > - where we record these change, should we add a builtin
> meta
> > > >> field
> > > >> >> such
> > > >> >> > > as
> > > >> >> > > > the _change_flag_ like the other system columns for e.g
> > > >> >> > > _hoodie_commit_time
> > > >> >> > > > - what kind of table should keep these flags, in my
> thoughts,
> > > we
> > > >> >> should
> > > >> >> > > > only add these flags for "MERGE_ON_READ" table, and only
> for
> > > AVRO
> > > >> >> logs
> > > >> >> > > > - we should add a config there to switch on/off the flags
> in
> > > >> system
> > > >> >> > meta
> > > >> >> > > > fields
> > > >> >> > > >
> > > >> >> > > > What do you think?
> > > >> >> > > >
> > > >> >> > > > Best,
> > > >> >> > > > Danny Chan
> > > >> >> > > >
> > > >> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> > > >> >> > > >
> > > >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> > > insert,
> > > >> >> > > >> update(before
> > > >> >> > > >> and after) and delete.
> > > >> >> > > >>
> > > >> >> > > >> Yes, the image I attached is also about these flags.
> > > >> >> > > >> [image: image (3).png]
> > > >> >> > > >>
> > > >> >> > > >> +1 for the idea.
> > > >> >> > > >>
> > > >> >> > > >> Best,
> > > >> >> > > >> Vino
> > > >> >> > > >>
> > > >> >> > > >>
> > > >> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> > > >> >> > > >>
> > > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> > insert,
> > > >> >> > > >>> update(before
> > > >> >> > > >>> and after) and delete.
> > > >> >> > > >>>
> > > >> >> > > >>> The Flink engine can propagate the change flags
> internally
> > > >> between
> > > >> >> > its
> > > >> >> > > >>> operators, if HUDI can send the change flags to Flink,
> the
> > > >> >> > incremental
> > > >> >> > > >>> calculation of CDC would be very natural (almost
> > transparent
> > > to
> > > >> >> > users).
> > > >> >> > > >>>
> > > >> >> > > >>> Best,
> > > >> >> > > >>> Danny Chan
> > > >> >> > > >>>
> > > >> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三
> 下午11:32写道:
> > > >> >> > > >>>
> > > >> >> > > >>> > Hi Danny,
> > > >> >> > > >>> >
> > > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > > >> >> > > >>> >
> > > >> >> > > >>> > Yes, incremental query( or says "incremental
> processing")
> > > has
> > > >> >> > always
> > > >> >> > > >>> been
> > > >> >> > > >>> > an important feature of the Hudi framework. If we can
> > make
> > > >> this
> > > >> >> > > feature
> > > >> >> > > >>> > better, it will be even more exciting.
> > > >> >> > > >>> >
> > > >> >> > > >>> > In the data warehouse, in some complex calculations, I
> > have
> > > >> not
> > > >> >> > > found a
> > > >> >> > > >>> > good way to conveniently use some incremental change
> data
> > > >> >> (similar
> > > >> >> > to
> > > >> >> > > >>> the
> > > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > > "correct"
> > > >> >> the
> > > >> >> > > >>> > aggregation result (these aggregation results may
> belong
> > to
> > > >> the
> > > >> >> DWS
> > > >> >> > > >>> layer).
> > > >> >> > > >>> >
> > > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> > scenarios
> > > >> >> (single
> > > >> >> > > >>> table
> > > >> >> > > >>> > or an algorithm that can be very easily retracement)
> can
> > be
> > > >> >> dealt
> > > >> >> > > with
> > > >> >> > > >>> > based on the incremental calculation of CDC.
> > > >> >> > > >>> >
> > > >> >> > > >>> > Of course, the expression of incremental calculation on
> > > >> various
> > > >> >> > > >>> occasions
> > > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it
> > more
> > > >> >> clearly
> > > >> >> > in
> > > >> >> > > >>> > specific scenarios.
> > > >> >> > > >>> >
> > > >> >> > > >>> > >> If HUDI can keep and propagate these change flags to
> > its
> > > >> >> > > consumers,
> > > >> >> > > >>> we
> > > >> >> > > >>> > can
> > > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > > >> >> > > >>> >
> > > >> >> > > >>> > Regarding the "change flags" here, do you mean the
> flags
> > > like
> > > >> >> the
> > > >> >> > one
> > > >> >> > > >>> > shown in the figure below?
> > > >> >> > > >>> >
> > > >> >> > > >>> > [image: image.png]
> > > >> >> > > >>> >
> > > >> >> > > >>> > Best,
> > > >> >> > > >>> > Vino
> > > >> >> > > >>> >
> > > >> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三
> > 下午6:24写道:
> > > >> >> > > >>> >
> > > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss
> > > about
> > > >> >> using
> > > >> >> > > >>> HUDI as
> > > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > > >> incremental
> > > >> >> > > >>> >> computation.
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> Usually people divide data warehouse production into
> > > several
> > > >> >> > levels,
> > > >> >> > > >>> such
> > > >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> > > >> details),
> > > >> >> > > >>> DWS(data
> > > >> >> > > >>> >> warehouse service), ADS(application data service).
> > > >> >> > > >>> >>
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> > > cases,
> > > >> a
> > > >> >> big
> > > >> >> > > >>> topic is
> > > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of
> > > RDBMS
> > > >> >> into
> > > >> >> > the
> > > >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate
> the
> > > >> change
> > > >> >> > > flag:
> > > >> >> > > >>> >> insert, update(before and after) and delete for the
> > > >> consumer,
> > > >> >> with
> > > >> >> > > >>> these
> > > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > > >> accumulation
> > > >> >> > > >>> >> computation.
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> Using streaming engine like Flink, we can have a
> totally
> > > >> >> > > >>> NEAR-REAL-TIME
> > > >> >> > > >>> >> computation pipeline for each of the layer.
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> If HUDI can keep and propagate these change flags to
> its
> > > >> >> > consumers,
> > > >> >> > > >>> we can
> > > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > > >> >> > > >>> >>
> > > >> >> > > >>> >> Best,
> > > >> >> > > >>> >> Danny Chan
> > > >> >> > > >>> >>
> > > >> >> > > >>> >
> > > >> >> > > >>>
> > > >> >> > > >>
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Regards,
> > > >> > -Sivabalan
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Regards,
> > > >> -Sivabalan
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
> Is it providing the ability to author continuous queries on
Hudi source tables end-end,
given Flink can use the flags to generate retract/upsert streams

Yes,that's the key point, with these flags plus flink stateful operators,
we can have a real time incremental ETL pipeline.

For example, a global aggregation that consumes cdc stream can do
acc/retract continuously and send the changes to downstream.
The ETL pipeline with cdc stream generates the same result as the batch
snapshot with the same sql query.

If keeping compatibility is a must with/without the new metadata columns, I
think there is no need to add a config option which brings in
unnecessary overhead. If we do not ensure backward compatibility for new
column, then we should add such a config option and by default
disable it.

Best,
Danny Chan


Vinoth Chandar <vi...@apache.org> 于2021年4月21日周三 上午6:30写道:

> Hi Danny,
>
> Read up on the Flink docs as well.
>
> If we don't actually publish data to the metacolumn, I think the overhead
> is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
> But, I feel it's worth adding a HoodieWriteConfig to control this and since
> addition of meta columns mostly happens in the handles,
> it may not be as bad ? Happy to suggest more concrete ideas on the PR.
>
> We still need to test backwards compatibility from different engines quite
> early though and make sure there are no surprises.
> Hive, Parquet, Spark, Presto all have their own rules on evolution as well.
> So we need to think this through if/how seamlessly this can be turned on
> for existing tables
>
> As for testing the new column, given Flink is what will be able to consume
> the flags? Can we write a quick unit test using Dynamic tables?
> I am also curious to understand how the flags help the end user ultimately?
> Reading the flink docs, I understand the concepts (coming from a Kafka
> streams world,
> most of it seem familiar), but what exact problem does the flag solve that
> exist today? Is it providing the ability to author continuous queries on
> Hudi source tables end-end,
> given Flink can use the flags to generate retract/upsert streams?
>
> For hard deletes, we still need to do some core work to make it available
> in the incremental query. So there's more to be done here for cracking this
> end-end streaming/continuous ETL vision?
>
> Very exciting stuff!
>
> Thanks
> Vinoth
>
>
>
>
>
> On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <da...@apache.org> wrote:
>
> > Hi, i have created a PR here:
> > https://github.com/apache/hudi/pull/2854/files
> >
> > In the PR i do these changes:
> > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
> > option because i can not find a good way to make the code clean, a
> metadata
> > column is very primitive and a config option would introduce too many
> > changes
> > 2. Modify the write handle to add the column: add operation for append
> > handle but merges the changes for create handle and merge handle
> > 3. the flag is only useful for streaming read, so i also merge the flags
> > for Flink batch reader, Flink streaming reader would emit each record
> with
> > the right cdc operation
> >
> > I did not change any Spark code because i'm not familiar with that, Spark
> > actually can not handle these flags in operators, So by default, the
> > column "_hoodie_cdc_operation", it has a value from the Flink writer.
> >
> > There may also need some unit tests for the new column in the hoodie
> core,
> > but i don't know how to, could you give some help ?
> >
> > Best,
> > Danny Chan
> >
> > Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:
> >
> > > Thanks @Sivabalan ~
> > >
> > > I agree that parquet and log files should keep sync in metadata columns
> > in
> > > case there are confusions
> > > and special handling in some use cases like compaction.
> > >
> > > I also agree add a metadata column is more ease to use for SQL
> > connectors.
> > >
> > > We can add a metadata column named "_hoodie_change_flag" and a config
> > > option to default disable this metadata column, what do you think?
> > >
> > > Best,
> > > Danny Chan
> > >
> > >
> > >
> > > Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > >
> > >> wrt changes if we plan to add this only to log files, compaction needs
> > to
> > >> be fixed to omit this column to the minimum.
> > >>
> > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com> wrote:
> > >>
> > >> > Just got a chance to read about dynamic tables. sounds interesting.
> > >> >
> > >> > some thoughts on your questions:
> > >> > - yes, just MOR makes sense.
> > >> > - But adding this new meta column only to avro logs might incur some
> > non
> > >> > trivial changes. Since as of today, schema of avro and base files
> are
> > in
> > >> > sync. If this new col is going to store just 2
> > >> bits(insert/update/delete),
> > >> > we might as well add it to base files as well and keep it simple.
> But
> > we
> > >> > can make it configurable so that only those interested can enable
> this
> > >> new
> > >> > column to hudi dataset.
> > >> > - Wondering, even today we can achieve this by using a transformer,
> to
> > >> set
> > >> > right values to this new column. I mean, users need to add this col
> in
> > >> > their schema when defining the hudi dataset and if the incoming data
> > has
> > >> > right values for this col (using deltastreamer's transformer or by
> > >> explicit
> > >> > means), we don't even need to add this as a meta column. Just saying
> > >> that
> > >> > we can achieve this even today. But if we are looking to integrate
> w/
> > >> SQL
> > >> > DML, then adding this support would be elegant.
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org>
> > >> wrote:
> > >> >
> > >> >> Thanks Vinoth ~
> > >> >>
> > >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] ,
> > every
> > >> >> operator that has accumulate state can handle
> > >> retractions(UPDATE_BEFORE or
> > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that
> each
> > >> >> operator can consume the CDC format messages in streaming way.
> > >> >>
> > >> >> > Another aspect to think about is, how the new flag can be added
> to
> > >> >> existing
> > >> >> tables and if the schema evolution would be fine.
> > >> >>
> > >> >> That is also my concern, but it's not that bad because adding a new
> > >> column
> > >> >> is still compatible for old schema in Avro.
> > >> >>
> > >> >> [1]
> > >> >>
> > >> >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > >> >>
> > >> >> Best,
> > >> >> Danny Chan
> > >> >>
> > >> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > >> >>
> > >> >> > Hi,
> > >> >> >
> > >> >> > Is the intent of the flag to convey if an insert delete or update
> > >> >> changed
> > >> >> > the record? If so I would imagine that we do this even for cow
> > >> tables,
> > >> >> > since that also supports a logical notion of a change stream
> using
> > >> the
> > >> >> > commit_time meta field.
> > >> >> >
> > >> >> > You may be right, but I am trying to understand the use case for
> > >> this.
> > >> >> Any
> > >> >> > links/flink docs I can read?
> > >> >> >
> > >> >> > Another aspect to think about is, how the new flag can be added
> to
> > >> >> existing
> > >> >> > tables and if the schema evolution would be fine.
> > >> >> >
> > >> >> > Thanks
> > >> >> > Vinoth
> > >> >> >
> > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org>
> > >> wrote:
> > >> >> >
> > >> >> > > I tries to do a POC for flink locally and it works well, in the
> > PR
> > >> i
> > >> >> add
> > >> >> > a
> > >> >> > > new metadata column named "_hoodie_change_flag", but actually i
> > >> found
> > >> >> > that
> > >> >> > > only log format needs this flag, and the Spark may has no
> ability
> > >> to
> > >> >> > handle
> > >> >> > > the flag for incremental processing yet.
> > >> >> > >
> > >> >> > > So should i add the "_hoodie_change_flag" metadata column, or
> is
> > >> there
> > >> >> > any
> > >> >> > > better solution for this?
> > >> >> > >
> > >> >> > > Best,
> > >> >> > > Danny Chan
> > >> >> > >
> > >> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> > >> >> > >
> > >> >> > > > Thanks cool, then the left questions are:
> > >> >> > > >
> > >> >> > > > - where we record these change, should we add a builtin meta
> > >> field
> > >> >> such
> > >> >> > > as
> > >> >> > > > the _change_flag_ like the other system columns for e.g
> > >> >> > > _hoodie_commit_time
> > >> >> > > > - what kind of table should keep these flags, in my thoughts,
> > we
> > >> >> should
> > >> >> > > > only add these flags for "MERGE_ON_READ" table, and only for
> > AVRO
> > >> >> logs
> > >> >> > > > - we should add a config there to switch on/off the flags in
> > >> system
> > >> >> > meta
> > >> >> > > > fields
> > >> >> > > >
> > >> >> > > > What do you think?
> > >> >> > > >
> > >> >> > > > Best,
> > >> >> > > > Danny Chan
> > >> >> > > >
> > >> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> > >> >> > > >
> > >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> > insert,
> > >> >> > > >> update(before
> > >> >> > > >> and after) and delete.
> > >> >> > > >>
> > >> >> > > >> Yes, the image I attached is also about these flags.
> > >> >> > > >> [image: image (3).png]
> > >> >> > > >>
> > >> >> > > >> +1 for the idea.
> > >> >> > > >>
> > >> >> > > >> Best,
> > >> >> > > >> Vino
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> > >> >> > > >>
> > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> insert,
> > >> >> > > >>> update(before
> > >> >> > > >>> and after) and delete.
> > >> >> > > >>>
> > >> >> > > >>> The Flink engine can propagate the change flags internally
> > >> between
> > >> >> > its
> > >> >> > > >>> operators, if HUDI can send the change flags to Flink, the
> > >> >> > incremental
> > >> >> > > >>> calculation of CDC would be very natural (almost
> transparent
> > to
> > >> >> > users).
> > >> >> > > >>>
> > >> >> > > >>> Best,
> > >> >> > > >>> Danny Chan
> > >> >> > > >>>
> > >> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> > >> >> > > >>>
> > >> >> > > >>> > Hi Danny,
> > >> >> > > >>> >
> > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > >> >> > > >>> >
> > >> >> > > >>> > Yes, incremental query( or says "incremental processing")
> > has
> > >> >> > always
> > >> >> > > >>> been
> > >> >> > > >>> > an important feature of the Hudi framework. If we can
> make
> > >> this
> > >> >> > > feature
> > >> >> > > >>> > better, it will be even more exciting.
> > >> >> > > >>> >
> > >> >> > > >>> > In the data warehouse, in some complex calculations, I
> have
> > >> not
> > >> >> > > found a
> > >> >> > > >>> > good way to conveniently use some incremental change data
> > >> >> (similar
> > >> >> > to
> > >> >> > > >>> the
> > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > "correct"
> > >> >> the
> > >> >> > > >>> > aggregation result (these aggregation results may belong
> to
> > >> the
> > >> >> DWS
> > >> >> > > >>> layer).
> > >> >> > > >>> >
> > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> scenarios
> > >> >> (single
> > >> >> > > >>> table
> > >> >> > > >>> > or an algorithm that can be very easily retracement) can
> be
> > >> >> dealt
> > >> >> > > with
> > >> >> > > >>> > based on the incremental calculation of CDC.
> > >> >> > > >>> >
> > >> >> > > >>> > Of course, the expression of incremental calculation on
> > >> various
> > >> >> > > >>> occasions
> > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it
> more
> > >> >> clearly
> > >> >> > in
> > >> >> > > >>> > specific scenarios.
> > >> >> > > >>> >
> > >> >> > > >>> > >> If HUDI can keep and propagate these change flags to
> its
> > >> >> > > consumers,
> > >> >> > > >>> we
> > >> >> > > >>> > can
> > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > >> >> > > >>> >
> > >> >> > > >>> > Regarding the "change flags" here, do you mean the flags
> > like
> > >> >> the
> > >> >> > one
> > >> >> > > >>> > shown in the figure below?
> > >> >> > > >>> >
> > >> >> > > >>> > [image: image.png]
> > >> >> > > >>> >
> > >> >> > > >>> > Best,
> > >> >> > > >>> > Vino
> > >> >> > > >>> >
> > >> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三
> 下午6:24写道:
> > >> >> > > >>> >
> > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss
> > about
> > >> >> using
> > >> >> > > >>> HUDI as
> > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > >> incremental
> > >> >> > > >>> >> computation.
> > >> >> > > >>> >>
> > >> >> > > >>> >> Usually people divide data warehouse production into
> > several
> > >> >> > levels,
> > >> >> > > >>> such
> > >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> > >> details),
> > >> >> > > >>> DWS(data
> > >> >> > > >>> >> warehouse service), ADS(application data service).
> > >> >> > > >>> >>
> > >> >> > > >>> >>
> > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > >> >> > > >>> >>
> > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> > cases,
> > >> a
> > >> >> big
> > >> >> > > >>> topic is
> > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of
> > RDBMS
> > >> >> into
> > >> >> > the
> > >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
> > >> change
> > >> >> > > flag:
> > >> >> > > >>> >> insert, update(before and after) and delete for the
> > >> consumer,
> > >> >> with
> > >> >> > > >>> these
> > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > >> accumulation
> > >> >> > > >>> >> computation.
> > >> >> > > >>> >>
> > >> >> > > >>> >> Using streaming engine like Flink, we can have a totally
> > >> >> > > >>> NEAR-REAL-TIME
> > >> >> > > >>> >> computation pipeline for each of the layer.
> > >> >> > > >>> >>
> > >> >> > > >>> >> If HUDI can keep and propagate these change flags to its
> > >> >> > consumers,
> > >> >> > > >>> we can
> > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > >> >> > > >>> >>
> > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > >> >> > > >>> >>
> > >> >> > > >>> >> Best,
> > >> >> > > >>> >> Danny Chan
> > >> >> > > >>> >>
> > >> >> > > >>> >
> > >> >> > > >>>
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >> > --
> > >> > Regards,
> > >> > -Sivabalan
> > >> >
> > >>
> > >>
> > >> --
> > >> Regards,
> > >> -Sivabalan
> > >>
> > >
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Vinoth Chandar <vi...@apache.org>.
Hi Danny,

Read up on the Flink docs as well.

If we don't actually publish data to the metacolumn, I think the overhead
is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
But, I feel it's worth adding a HoodieWriteConfig to control this and since
addition of meta columns mostly happens in the handles,
it may not be as bad ? Happy to suggest more concrete ideas on the PR.

We still need to test backwards compatibility from different engines quite
early though and make sure there are no surprises.
Hive, Parquet, Spark, Presto all have their own rules on evolution as well.
So we need to think this through if/how seamlessly this can be turned on
for existing tables

As for testing the new column, given Flink is what will be able to consume
the flags? Can we write a quick unit test using Dynamic tables?
I am also curious to understand how the flags help the end user ultimately?
Reading the flink docs, I understand the concepts (coming from a Kafka
streams world,
most of it seem familiar), but what exact problem does the flag solve that
exist today? Is it providing the ability to author continuous queries on
Hudi source tables end-end,
given Flink can use the flags to generate retract/upsert streams?

For hard deletes, we still need to do some core work to make it available
in the incremental query. So there's more to be done here for cracking this
end-end streaming/continuous ETL vision?

Very exciting stuff!

Thanks
Vinoth





On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <da...@apache.org> wrote:

> Hi, i have created a PR here:
> https://github.com/apache/hudi/pull/2854/files
>
> In the PR i do these changes:
> 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
> option because i can not find a good way to make the code clean, a metadata
> column is very primitive and a config option would introduce too many
> changes
> 2. Modify the write handle to add the column: add operation for append
> handle but merges the changes for create handle and merge handle
> 3. the flag is only useful for streaming read, so i also merge the flags
> for Flink batch reader, Flink streaming reader would emit each record with
> the right cdc operation
>
> I did not change any Spark code because i'm not familiar with that, Spark
> actually can not handle these flags in operators, So by default, the
> column "_hoodie_cdc_operation", it has a value from the Flink writer.
>
> There may also need some unit tests for the new column in the hoodie core,
> but i don't know how to, could you give some help ?
>
> Best,
> Danny Chan
>
> Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:
>
> > Thanks @Sivabalan ~
> >
> > I agree that parquet and log files should keep sync in metadata columns
> in
> > case there are confusions
> > and special handling in some use cases like compaction.
> >
> > I also agree add a metadata column is more ease to use for SQL
> connectors.
> >
> > We can add a metadata column named "_hoodie_change_flag" and a config
> > option to default disable this metadata column, what do you think?
> >
> > Best,
> > Danny Chan
> >
> >
> >
> > Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
> >
> >> wrt changes if we plan to add this only to log files, compaction needs
> to
> >> be fixed to omit this column to the minimum.
> >>
> >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com> wrote:
> >>
> >> > Just got a chance to read about dynamic tables. sounds interesting.
> >> >
> >> > some thoughts on your questions:
> >> > - yes, just MOR makes sense.
> >> > - But adding this new meta column only to avro logs might incur some
> non
> >> > trivial changes. Since as of today, schema of avro and base files are
> in
> >> > sync. If this new col is going to store just 2
> >> bits(insert/update/delete),
> >> > we might as well add it to base files as well and keep it simple. But
> we
> >> > can make it configurable so that only those interested can enable this
> >> new
> >> > column to hudi dataset.
> >> > - Wondering, even today we can achieve this by using a transformer, to
> >> set
> >> > right values to this new column. I mean, users need to add this col in
> >> > their schema when defining the hudi dataset and if the incoming data
> has
> >> > right values for this col (using deltastreamer's transformer or by
> >> explicit
> >> > means), we don't even need to add this as a meta column. Just saying
> >> that
> >> > we can achieve this even today. But if we are looking to integrate w/
> >> SQL
> >> > DML, then adding this support would be elegant.
> >> >
> >> >
> >> >
> >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org>
> >> wrote:
> >> >
> >> >> Thanks Vinoth ~
> >> >>
> >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] ,
> every
> >> >> operator that has accumulate state can handle
> >> retractions(UPDATE_BEFORE or
> >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
> >> >> operator can consume the CDC format messages in streaming way.
> >> >>
> >> >> > Another aspect to think about is, how the new flag can be added to
> >> >> existing
> >> >> tables and if the schema evolution would be fine.
> >> >>
> >> >> That is also my concern, but it's not that bad because adding a new
> >> column
> >> >> is still compatible for old schema in Avro.
> >> >>
> >> >> [1]
> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> >> >>
> >> >> Best,
> >> >> Danny Chan
> >> >>
> >> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> >> >>
> >> >> > Hi,
> >> >> >
> >> >> > Is the intent of the flag to convey if an insert delete or update
> >> >> changed
> >> >> > the record? If so I would imagine that we do this even for cow
> >> tables,
> >> >> > since that also supports a logical notion of a change stream using
> >> the
> >> >> > commit_time meta field.
> >> >> >
> >> >> > You may be right, but I am trying to understand the use case for
> >> this.
> >> >> Any
> >> >> > links/flink docs I can read?
> >> >> >
> >> >> > Another aspect to think about is, how the new flag can be added to
> >> >> existing
> >> >> > tables and if the schema evolution would be fine.
> >> >> >
> >> >> > Thanks
> >> >> > Vinoth
> >> >> >
> >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org>
> >> wrote:
> >> >> >
> >> >> > > I tries to do a POC for flink locally and it works well, in the
> PR
> >> i
> >> >> add
> >> >> > a
> >> >> > > new metadata column named "_hoodie_change_flag", but actually i
> >> found
> >> >> > that
> >> >> > > only log format needs this flag, and the Spark may has no ability
> >> to
> >> >> > handle
> >> >> > > the flag for incremental processing yet.
> >> >> > >
> >> >> > > So should i add the "_hoodie_change_flag" metadata column, or is
> >> there
> >> >> > any
> >> >> > > better solution for this?
> >> >> > >
> >> >> > > Best,
> >> >> > > Danny Chan
> >> >> > >
> >> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> >> >> > >
> >> >> > > > Thanks cool, then the left questions are:
> >> >> > > >
> >> >> > > > - where we record these change, should we add a builtin meta
> >> field
> >> >> such
> >> >> > > as
> >> >> > > > the _change_flag_ like the other system columns for e.g
> >> >> > > _hoodie_commit_time
> >> >> > > > - what kind of table should keep these flags, in my thoughts,
> we
> >> >> should
> >> >> > > > only add these flags for "MERGE_ON_READ" table, and only for
> AVRO
> >> >> logs
> >> >> > > > - we should add a config there to switch on/off the flags in
> >> system
> >> >> > meta
> >> >> > > > fields
> >> >> > > >
> >> >> > > > What do you think?
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Danny Chan
> >> >> > > >
> >> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> >> >> > > >
> >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> insert,
> >> >> > > >> update(before
> >> >> > > >> and after) and delete.
> >> >> > > >>
> >> >> > > >> Yes, the image I attached is also about these flags.
> >> >> > > >> [image: image (3).png]
> >> >> > > >>
> >> >> > > >> +1 for the idea.
> >> >> > > >>
> >> >> > > >> Best,
> >> >> > > >> Vino
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> >> >> > > >>
> >> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
> >> >> > > >>> update(before
> >> >> > > >>> and after) and delete.
> >> >> > > >>>
> >> >> > > >>> The Flink engine can propagate the change flags internally
> >> between
> >> >> > its
> >> >> > > >>> operators, if HUDI can send the change flags to Flink, the
> >> >> > incremental
> >> >> > > >>> calculation of CDC would be very natural (almost transparent
> to
> >> >> > users).
> >> >> > > >>>
> >> >> > > >>> Best,
> >> >> > > >>> Danny Chan
> >> >> > > >>>
> >> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> >> >> > > >>>
> >> >> > > >>> > Hi Danny,
> >> >> > > >>> >
> >> >> > > >>> > Thanks for kicking off this discussion thread.
> >> >> > > >>> >
> >> >> > > >>> > Yes, incremental query( or says "incremental processing")
> has
> >> >> > always
> >> >> > > >>> been
> >> >> > > >>> > an important feature of the Hudi framework. If we can make
> >> this
> >> >> > > feature
> >> >> > > >>> > better, it will be even more exciting.
> >> >> > > >>> >
> >> >> > > >>> > In the data warehouse, in some complex calculations, I have
> >> not
> >> >> > > found a
> >> >> > > >>> > good way to conveniently use some incremental change data
> >> >> (similar
> >> >> > to
> >> >> > > >>> the
> >> >> > > >>> > concept of retracement stream in Flink?) to locally
> "correct"
> >> >> the
> >> >> > > >>> > aggregation result (these aggregation results may belong to
> >> the
> >> >> DWS
> >> >> > > >>> layer).
> >> >> > > >>> >
> >> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
> >> >> (single
> >> >> > > >>> table
> >> >> > > >>> > or an algorithm that can be very easily retracement) can be
> >> >> dealt
> >> >> > > with
> >> >> > > >>> > based on the incremental calculation of CDC.
> >> >> > > >>> >
> >> >> > > >>> > Of course, the expression of incremental calculation on
> >> various
> >> >> > > >>> occasions
> >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more
> >> >> clearly
> >> >> > in
> >> >> > > >>> > specific scenarios.
> >> >> > > >>> >
> >> >> > > >>> > >> If HUDI can keep and propagate these change flags to its
> >> >> > > consumers,
> >> >> > > >>> we
> >> >> > > >>> > can
> >> >> > > >>> > use HUDI as the unified format for the pipeline.
> >> >> > > >>> >
> >> >> > > >>> > Regarding the "change flags" here, do you mean the flags
> like
> >> >> the
> >> >> > one
> >> >> > > >>> > shown in the figure below?
> >> >> > > >>> >
> >> >> > > >>> > [image: image.png]
> >> >> > > >>> >
> >> >> > > >>> > Best,
> >> >> > > >>> > Vino
> >> >> > > >>> >
> >> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> >> >> > > >>> >
> >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss
> about
> >> >> using
> >> >> > > >>> HUDI as
> >> >> > > >>> >> the unified storage/format for data warehouse/lake
> >> incremental
> >> >> > > >>> >> computation.
> >> >> > > >>> >>
> >> >> > > >>> >> Usually people divide data warehouse production into
> several
> >> >> > levels,
> >> >> > > >>> such
> >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> >> details),
> >> >> > > >>> DWS(data
> >> >> > > >>> >> warehouse service), ADS(application data service).
> >> >> > > >>> >>
> >> >> > > >>> >>
> >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> >> >> > > >>> >>
> >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> cases,
> >> a
> >> >> big
> >> >> > > >>> topic is
> >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of
> RDBMS
> >> >> into
> >> >> > the
> >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
> >> change
> >> >> > > flag:
> >> >> > > >>> >> insert, update(before and after) and delete for the
> >> consumer,
> >> >> with
> >> >> > > >>> these
> >> >> > > >>> >> flags, the downstream engines can have a realtime
> >> accumulation
> >> >> > > >>> >> computation.
> >> >> > > >>> >>
> >> >> > > >>> >> Using streaming engine like Flink, we can have a totally
> >> >> > > >>> NEAR-REAL-TIME
> >> >> > > >>> >> computation pipeline for each of the layer.
> >> >> > > >>> >>
> >> >> > > >>> >> If HUDI can keep and propagate these change flags to its
> >> >> > consumers,
> >> >> > > >>> we can
> >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> >> >> > > >>> >>
> >> >> > > >>> >> I'm expecting your nice ideas here ~
> >> >> > > >>> >>
> >> >> > > >>> >> Best,
> >> >> > > >>> >> Danny Chan
> >> >> > > >>> >>
> >> >> > > >>> >
> >> >> > > >>>
> >> >> > > >>
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > Regards,
> >> > -Sivabalan
> >> >
> >>
> >>
> >> --
> >> Regards,
> >> -Sivabalan
> >>
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
Hi, i have created a PR here: https://github.com/apache/hudi/pull/2854/files

In the PR i do these changes:
1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
option because i can not find a good way to make the code clean, a metadata
column is very primitive and a config option would introduce too many
changes
2. Modify the write handle to add the column: add operation for append
handle but merges the changes for create handle and merge handle
3. the flag is only useful for streaming read, so i also merge the flags
for Flink batch reader, Flink streaming reader would emit each record with
the right cdc operation

I did not change any Spark code because i'm not familiar with that, Spark
actually can not handle these flags in operators, So by default, the
column "_hoodie_cdc_operation", it has a value from the Flink writer.

There may also need some unit tests for the new column in the hoodie core,
but i don't know how to, could you give some help ?

Best,
Danny Chan

Danny Chan <da...@apache.org> 于2021年4月19日周一 下午4:42写道:

> Thanks @Sivabalan ~
>
> I agree that parquet and log files should keep sync in metadata columns in
> case there are confusions
> and special handling in some use cases like compaction.
>
> I also agree add a metadata column is more ease to use for SQL connectors.
>
> We can add a metadata column named "_hoodie_change_flag" and a config
> option to default disable this metadata column, what do you think?
>
> Best,
> Danny Chan
>
>
>
> Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:
>
>> wrt changes if we plan to add this only to log files, compaction needs to
>> be fixed to omit this column to the minimum.
>>
>> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com> wrote:
>>
>> > Just got a chance to read about dynamic tables. sounds interesting.
>> >
>> > some thoughts on your questions:
>> > - yes, just MOR makes sense.
>> > - But adding this new meta column only to avro logs might incur some non
>> > trivial changes. Since as of today, schema of avro and base files are in
>> > sync. If this new col is going to store just 2
>> bits(insert/update/delete),
>> > we might as well add it to base files as well and keep it simple. But we
>> > can make it configurable so that only those interested can enable this
>> new
>> > column to hudi dataset.
>> > - Wondering, even today we can achieve this by using a transformer, to
>> set
>> > right values to this new column. I mean, users need to add this col in
>> > their schema when defining the hudi dataset and if the incoming data has
>> > right values for this col (using deltastreamer's transformer or by
>> explicit
>> > means), we don't even need to add this as a meta column. Just saying
>> that
>> > we can achieve this even today. But if we are looking to integrate w/
>> SQL
>> > DML, then adding this support would be elegant.
>> >
>> >
>> >
>> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org>
>> wrote:
>> >
>> >> Thanks Vinoth ~
>> >>
>> >> Here is a document about the notion of 《Flink Dynamic Table》[1] , every
>> >> operator that has accumulate state can handle
>> retractions(UPDATE_BEFORE or
>> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
>> >> operator can consume the CDC format messages in streaming way.
>> >>
>> >> > Another aspect to think about is, how the new flag can be added to
>> >> existing
>> >> tables and if the schema evolution would be fine.
>> >>
>> >> That is also my concern, but it's not that bad because adding a new
>> column
>> >> is still compatible for old schema in Avro.
>> >>
>> >> [1]
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
>> >>
>> >> Best,
>> >> Danny Chan
>> >>
>> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
>> >>
>> >> > Hi,
>> >> >
>> >> > Is the intent of the flag to convey if an insert delete or update
>> >> changed
>> >> > the record? If so I would imagine that we do this even for cow
>> tables,
>> >> > since that also supports a logical notion of a change stream using
>> the
>> >> > commit_time meta field.
>> >> >
>> >> > You may be right, but I am trying to understand the use case for
>> this.
>> >> Any
>> >> > links/flink docs I can read?
>> >> >
>> >> > Another aspect to think about is, how the new flag can be added to
>> >> existing
>> >> > tables and if the schema evolution would be fine.
>> >> >
>> >> > Thanks
>> >> > Vinoth
>> >> >
>> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org>
>> wrote:
>> >> >
>> >> > > I tries to do a POC for flink locally and it works well, in the PR
>> i
>> >> add
>> >> > a
>> >> > > new metadata column named "_hoodie_change_flag", but actually i
>> found
>> >> > that
>> >> > > only log format needs this flag, and the Spark may has no ability
>> to
>> >> > handle
>> >> > > the flag for incremental processing yet.
>> >> > >
>> >> > > So should i add the "_hoodie_change_flag" metadata column, or is
>> there
>> >> > any
>> >> > > better solution for this?
>> >> > >
>> >> > > Best,
>> >> > > Danny Chan
>> >> > >
>> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
>> >> > >
>> >> > > > Thanks cool, then the left questions are:
>> >> > > >
>> >> > > > - where we record these change, should we add a builtin meta
>> field
>> >> such
>> >> > > as
>> >> > > > the _change_flag_ like the other system columns for e.g
>> >> > > _hoodie_commit_time
>> >> > > > - what kind of table should keep these flags, in my thoughts, we
>> >> should
>> >> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO
>> >> logs
>> >> > > > - we should add a config there to switch on/off the flags in
>> system
>> >> > meta
>> >> > > > fields
>> >> > > >
>> >> > > > What do you think?
>> >> > > >
>> >> > > > Best,
>> >> > > > Danny Chan
>> >> > > >
>> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
>> >> > > >
>> >> > > >> >> Oops, the image crushes, for "change flags", i mean: insert,
>> >> > > >> update(before
>> >> > > >> and after) and delete.
>> >> > > >>
>> >> > > >> Yes, the image I attached is also about these flags.
>> >> > > >> [image: image (3).png]
>> >> > > >>
>> >> > > >> +1 for the idea.
>> >> > > >>
>> >> > > >> Best,
>> >> > > >> Vino
>> >> > > >>
>> >> > > >>
>> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
>> >> > > >>
>> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
>> >> > > >>> update(before
>> >> > > >>> and after) and delete.
>> >> > > >>>
>> >> > > >>> The Flink engine can propagate the change flags internally
>> between
>> >> > its
>> >> > > >>> operators, if HUDI can send the change flags to Flink, the
>> >> > incremental
>> >> > > >>> calculation of CDC would be very natural (almost transparent to
>> >> > users).
>> >> > > >>>
>> >> > > >>> Best,
>> >> > > >>> Danny Chan
>> >> > > >>>
>> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>> >> > > >>>
>> >> > > >>> > Hi Danny,
>> >> > > >>> >
>> >> > > >>> > Thanks for kicking off this discussion thread.
>> >> > > >>> >
>> >> > > >>> > Yes, incremental query( or says "incremental processing") has
>> >> > always
>> >> > > >>> been
>> >> > > >>> > an important feature of the Hudi framework. If we can make
>> this
>> >> > > feature
>> >> > > >>> > better, it will be even more exciting.
>> >> > > >>> >
>> >> > > >>> > In the data warehouse, in some complex calculations, I have
>> not
>> >> > > found a
>> >> > > >>> > good way to conveniently use some incremental change data
>> >> (similar
>> >> > to
>> >> > > >>> the
>> >> > > >>> > concept of retracement stream in Flink?) to locally "correct"
>> >> the
>> >> > > >>> > aggregation result (these aggregation results may belong to
>> the
>> >> DWS
>> >> > > >>> layer).
>> >> > > >>> >
>> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
>> >> (single
>> >> > > >>> table
>> >> > > >>> > or an algorithm that can be very easily retracement) can be
>> >> dealt
>> >> > > with
>> >> > > >>> > based on the incremental calculation of CDC.
>> >> > > >>> >
>> >> > > >>> > Of course, the expression of incremental calculation on
>> various
>> >> > > >>> occasions
>> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more
>> >> clearly
>> >> > in
>> >> > > >>> > specific scenarios.
>> >> > > >>> >
>> >> > > >>> > >> If HUDI can keep and propagate these change flags to its
>> >> > > consumers,
>> >> > > >>> we
>> >> > > >>> > can
>> >> > > >>> > use HUDI as the unified format for the pipeline.
>> >> > > >>> >
>> >> > > >>> > Regarding the "change flags" here, do you mean the flags like
>> >> the
>> >> > one
>> >> > > >>> > shown in the figure below?
>> >> > > >>> >
>> >> > > >>> > [image: image.png]
>> >> > > >>> >
>> >> > > >>> > Best,
>> >> > > >>> > Vino
>> >> > > >>> >
>> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
>> >> > > >>> >
>> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about
>> >> using
>> >> > > >>> HUDI as
>> >> > > >>> >> the unified storage/format for data warehouse/lake
>> incremental
>> >> > > >>> >> computation.
>> >> > > >>> >>
>> >> > > >>> >> Usually people divide data warehouse production into several
>> >> > levels,
>> >> > > >>> such
>> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
>> details),
>> >> > > >>> DWS(data
>> >> > > >>> >> warehouse service), ADS(application data service).
>> >> > > >>> >>
>> >> > > >>> >>
>> >> > > >>> >> ODS -> DWD -> DWS -> ADS
>> >> > > >>> >>
>> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases,
>> a
>> >> big
>> >> > > >>> topic is
>> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS
>> >> into
>> >> > the
>> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
>> change
>> >> > > flag:
>> >> > > >>> >> insert, update(before and after) and delete for the
>> consumer,
>> >> with
>> >> > > >>> these
>> >> > > >>> >> flags, the downstream engines can have a realtime
>> accumulation
>> >> > > >>> >> computation.
>> >> > > >>> >>
>> >> > > >>> >> Using streaming engine like Flink, we can have a totally
>> >> > > >>> NEAR-REAL-TIME
>> >> > > >>> >> computation pipeline for each of the layer.
>> >> > > >>> >>
>> >> > > >>> >> If HUDI can keep and propagate these change flags to its
>> >> > consumers,
>> >> > > >>> we can
>> >> > > >>> >> use HUDI as the unified format for the pipeline.
>> >> > > >>> >>
>> >> > > >>> >> I'm expecting your nice ideas here ~
>> >> > > >>> >>
>> >> > > >>> >> Best,
>> >> > > >>> >> Danny Chan
>> >> > > >>> >>
>> >> > > >>> >
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > -Sivabalan
>> >
>>
>>
>> --
>> Regards,
>> -Sivabalan
>>
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
Thanks @Sivabalan ~

I agree that parquet and log files should keep sync in metadata columns in
case there are confusions
and special handling in some use cases like compaction.

I also agree add a metadata column is more ease to use for SQL connectors.

We can add a metadata column named "_hoodie_change_flag" and a config
option to default disable this metadata column, what do you think?

Best,
Danny Chan



Sivabalan <n....@gmail.com> 于2021年4月17日周六 上午9:10写道:

> wrt changes if we plan to add this only to log files, compaction needs to
> be fixed to omit this column to the minimum.
>
> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com> wrote:
>
> > Just got a chance to read about dynamic tables. sounds interesting.
> >
> > some thoughts on your questions:
> > - yes, just MOR makes sense.
> > - But adding this new meta column only to avro logs might incur some non
> > trivial changes. Since as of today, schema of avro and base files are in
> > sync. If this new col is going to store just 2
> bits(insert/update/delete),
> > we might as well add it to base files as well and keep it simple. But we
> > can make it configurable so that only those interested can enable this
> new
> > column to hudi dataset.
> > - Wondering, even today we can achieve this by using a transformer, to
> set
> > right values to this new column. I mean, users need to add this col in
> > their schema when defining the hudi dataset and if the incoming data has
> > right values for this col (using deltastreamer's transformer or by
> explicit
> > means), we don't even need to add this as a meta column. Just saying that
> > we can achieve this even today. But if we are looking to integrate w/ SQL
> > DML, then adding this support would be elegant.
> >
> >
> >
> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org>
> wrote:
> >
> >> Thanks Vinoth ~
> >>
> >> Here is a document about the notion of 《Flink Dynamic Table》[1] , every
> >> operator that has accumulate state can handle retractions(UPDATE_BEFORE
> or
> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
> >> operator can consume the CDC format messages in streaming way.
> >>
> >> > Another aspect to think about is, how the new flag can be added to
> >> existing
> >> tables and if the schema evolution would be fine.
> >>
> >> That is also my concern, but it's not that bad because adding a new
> column
> >> is still compatible for old schema in Avro.
> >>
> >> [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> >>
> >> Best,
> >> Danny Chan
> >>
> >> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
> >>
> >> > Hi,
> >> >
> >> > Is the intent of the flag to convey if an insert delete or update
> >> changed
> >> > the record? If so I would imagine that we do this even for cow tables,
> >> > since that also supports a logical notion of a change stream using the
> >> > commit_time meta field.
> >> >
> >> > You may be right, but I am trying to understand the use case for this.
> >> Any
> >> > links/flink docs I can read?
> >> >
> >> > Another aspect to think about is, how the new flag can be added to
> >> existing
> >> > tables and if the schema evolution would be fine.
> >> >
> >> > Thanks
> >> > Vinoth
> >> >
> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org>
> wrote:
> >> >
> >> > > I tries to do a POC for flink locally and it works well, in the PR i
> >> add
> >> > a
> >> > > new metadata column named "_hoodie_change_flag", but actually i
> found
> >> > that
> >> > > only log format needs this flag, and the Spark may has no ability to
> >> > handle
> >> > > the flag for incremental processing yet.
> >> > >
> >> > > So should i add the "_hoodie_change_flag" metadata column, or is
> there
> >> > any
> >> > > better solution for this?
> >> > >
> >> > > Best,
> >> > > Danny Chan
> >> > >
> >> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> >> > >
> >> > > > Thanks cool, then the left questions are:
> >> > > >
> >> > > > - where we record these change, should we add a builtin meta field
> >> such
> >> > > as
> >> > > > the _change_flag_ like the other system columns for e.g
> >> > > _hoodie_commit_time
> >> > > > - what kind of table should keep these flags, in my thoughts, we
> >> should
> >> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO
> >> logs
> >> > > > - we should add a config there to switch on/off the flags in
> system
> >> > meta
> >> > > > fields
> >> > > >
> >> > > > What do you think?
> >> > > >
> >> > > > Best,
> >> > > > Danny Chan
> >> > > >
> >> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> >> > > >
> >> > > >> >> Oops, the image crushes, for "change flags", i mean: insert,
> >> > > >> update(before
> >> > > >> and after) and delete.
> >> > > >>
> >> > > >> Yes, the image I attached is also about these flags.
> >> > > >> [image: image (3).png]
> >> > > >>
> >> > > >> +1 for the idea.
> >> > > >>
> >> > > >> Best,
> >> > > >> Vino
> >> > > >>
> >> > > >>
> >> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> >> > > >>
> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
> >> > > >>> update(before
> >> > > >>> and after) and delete.
> >> > > >>>
> >> > > >>> The Flink engine can propagate the change flags internally
> between
> >> > its
> >> > > >>> operators, if HUDI can send the change flags to Flink, the
> >> > incremental
> >> > > >>> calculation of CDC would be very natural (almost transparent to
> >> > users).
> >> > > >>>
> >> > > >>> Best,
> >> > > >>> Danny Chan
> >> > > >>>
> >> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> >> > > >>>
> >> > > >>> > Hi Danny,
> >> > > >>> >
> >> > > >>> > Thanks for kicking off this discussion thread.
> >> > > >>> >
> >> > > >>> > Yes, incremental query( or says "incremental processing") has
> >> > always
> >> > > >>> been
> >> > > >>> > an important feature of the Hudi framework. If we can make
> this
> >> > > feature
> >> > > >>> > better, it will be even more exciting.
> >> > > >>> >
> >> > > >>> > In the data warehouse, in some complex calculations, I have
> not
> >> > > found a
> >> > > >>> > good way to conveniently use some incremental change data
> >> (similar
> >> > to
> >> > > >>> the
> >> > > >>> > concept of retracement stream in Flink?) to locally "correct"
> >> the
> >> > > >>> > aggregation result (these aggregation results may belong to
> the
> >> DWS
> >> > > >>> layer).
> >> > > >>> >
> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
> >> (single
> >> > > >>> table
> >> > > >>> > or an algorithm that can be very easily retracement) can be
> >> dealt
> >> > > with
> >> > > >>> > based on the incremental calculation of CDC.
> >> > > >>> >
> >> > > >>> > Of course, the expression of incremental calculation on
> various
> >> > > >>> occasions
> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more
> >> clearly
> >> > in
> >> > > >>> > specific scenarios.
> >> > > >>> >
> >> > > >>> > >> If HUDI can keep and propagate these change flags to its
> >> > > consumers,
> >> > > >>> we
> >> > > >>> > can
> >> > > >>> > use HUDI as the unified format for the pipeline.
> >> > > >>> >
> >> > > >>> > Regarding the "change flags" here, do you mean the flags like
> >> the
> >> > one
> >> > > >>> > shown in the figure below?
> >> > > >>> >
> >> > > >>> > [image: image.png]
> >> > > >>> >
> >> > > >>> > Best,
> >> > > >>> > Vino
> >> > > >>> >
> >> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> >> > > >>> >
> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about
> >> using
> >> > > >>> HUDI as
> >> > > >>> >> the unified storage/format for data warehouse/lake
> incremental
> >> > > >>> >> computation.
> >> > > >>> >>
> >> > > >>> >> Usually people divide data warehouse production into several
> >> > levels,
> >> > > >>> such
> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> details),
> >> > > >>> DWS(data
> >> > > >>> >> warehouse service), ADS(application data service).
> >> > > >>> >>
> >> > > >>> >>
> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> >> > > >>> >>
> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a
> >> big
> >> > > >>> topic is
> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS
> >> into
> >> > the
> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
> change
> >> > > flag:
> >> > > >>> >> insert, update(before and after) and delete for the consumer,
> >> with
> >> > > >>> these
> >> > > >>> >> flags, the downstream engines can have a realtime
> accumulation
> >> > > >>> >> computation.
> >> > > >>> >>
> >> > > >>> >> Using streaming engine like Flink, we can have a totally
> >> > > >>> NEAR-REAL-TIME
> >> > > >>> >> computation pipeline for each of the layer.
> >> > > >>> >>
> >> > > >>> >> If HUDI can keep and propagate these change flags to its
> >> > consumers,
> >> > > >>> we can
> >> > > >>> >> use HUDI as the unified format for the pipeline.
> >> > > >>> >>
> >> > > >>> >> I'm expecting your nice ideas here ~
> >> > > >>> >>
> >> > > >>> >> Best,
> >> > > >>> >> Danny Chan
> >> > > >>> >>
> >> > > >>> >
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> >
> >
> > --
> > Regards,
> > -Sivabalan
> >
>
>
> --
> Regards,
> -Sivabalan
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Sivabalan <n....@gmail.com>.
wrt changes if we plan to add this only to log files, compaction needs to
be fixed to omit this column to the minimum.

On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n....@gmail.com> wrote:

> Just got a chance to read about dynamic tables. sounds interesting.
>
> some thoughts on your questions:
> - yes, just MOR makes sense.
> - But adding this new meta column only to avro logs might incur some non
> trivial changes. Since as of today, schema of avro and base files are in
> sync. If this new col is going to store just 2 bits(insert/update/delete),
> we might as well add it to base files as well and keep it simple. But we
> can make it configurable so that only those interested can enable this new
> column to hudi dataset.
> - Wondering, even today we can achieve this by using a transformer, to set
> right values to this new column. I mean, users need to add this col in
> their schema when defining the hudi dataset and if the incoming data has
> right values for this col (using deltastreamer's transformer or by explicit
> means), we don't even need to add this as a meta column. Just saying that
> we can achieve this even today. But if we are looking to integrate w/ SQL
> DML, then adding this support would be elegant.
>
>
>
> On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org> wrote:
>
>> Thanks Vinoth ~
>>
>> Here is a document about the notion of 《Flink Dynamic Table》[1] , every
>> operator that has accumulate state can handle retractions(UPDATE_BEFORE or
>> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
>> operator can consume the CDC format messages in streaming way.
>>
>> > Another aspect to think about is, how the new flag can be added to
>> existing
>> tables and if the schema evolution would be fine.
>>
>> That is also my concern, but it's not that bad because adding a new column
>> is still compatible for old schema in Avro.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
>>
>> Best,
>> Danny Chan
>>
>> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
>>
>> > Hi,
>> >
>> > Is the intent of the flag to convey if an insert delete or update
>> changed
>> > the record? If so I would imagine that we do this even for cow tables,
>> > since that also supports a logical notion of a change stream using the
>> > commit_time meta field.
>> >
>> > You may be right, but I am trying to understand the use case for this.
>> Any
>> > links/flink docs I can read?
>> >
>> > Another aspect to think about is, how the new flag can be added to
>> existing
>> > tables and if the schema evolution would be fine.
>> >
>> > Thanks
>> > Vinoth
>> >
>> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org> wrote:
>> >
>> > > I tries to do a POC for flink locally and it works well, in the PR i
>> add
>> > a
>> > > new metadata column named "_hoodie_change_flag", but actually i found
>> > that
>> > > only log format needs this flag, and the Spark may has no ability to
>> > handle
>> > > the flag for incremental processing yet.
>> > >
>> > > So should i add the "_hoodie_change_flag" metadata column, or is there
>> > any
>> > > better solution for this?
>> > >
>> > > Best,
>> > > Danny Chan
>> > >
>> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
>> > >
>> > > > Thanks cool, then the left questions are:
>> > > >
>> > > > - where we record these change, should we add a builtin meta field
>> such
>> > > as
>> > > > the _change_flag_ like the other system columns for e.g
>> > > _hoodie_commit_time
>> > > > - what kind of table should keep these flags, in my thoughts, we
>> should
>> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO
>> logs
>> > > > - we should add a config there to switch on/off the flags in system
>> > meta
>> > > > fields
>> > > >
>> > > > What do you think?
>> > > >
>> > > > Best,
>> > > > Danny Chan
>> > > >
>> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
>> > > >
>> > > >> >> Oops, the image crushes, for "change flags", i mean: insert,
>> > > >> update(before
>> > > >> and after) and delete.
>> > > >>
>> > > >> Yes, the image I attached is also about these flags.
>> > > >> [image: image (3).png]
>> > > >>
>> > > >> +1 for the idea.
>> > > >>
>> > > >> Best,
>> > > >> Vino
>> > > >>
>> > > >>
>> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
>> > > >>
>> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
>> > > >>> update(before
>> > > >>> and after) and delete.
>> > > >>>
>> > > >>> The Flink engine can propagate the change flags internally between
>> > its
>> > > >>> operators, if HUDI can send the change flags to Flink, the
>> > incremental
>> > > >>> calculation of CDC would be very natural (almost transparent to
>> > users).
>> > > >>>
>> > > >>> Best,
>> > > >>> Danny Chan
>> > > >>>
>> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>> > > >>>
>> > > >>> > Hi Danny,
>> > > >>> >
>> > > >>> > Thanks for kicking off this discussion thread.
>> > > >>> >
>> > > >>> > Yes, incremental query( or says "incremental processing") has
>> > always
>> > > >>> been
>> > > >>> > an important feature of the Hudi framework. If we can make this
>> > > feature
>> > > >>> > better, it will be even more exciting.
>> > > >>> >
>> > > >>> > In the data warehouse, in some complex calculations, I have not
>> > > found a
>> > > >>> > good way to conveniently use some incremental change data
>> (similar
>> > to
>> > > >>> the
>> > > >>> > concept of retracement stream in Flink?) to locally "correct"
>> the
>> > > >>> > aggregation result (these aggregation results may belong to the
>> DWS
>> > > >>> layer).
>> > > >>> >
>> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
>> (single
>> > > >>> table
>> > > >>> > or an algorithm that can be very easily retracement) can be
>> dealt
>> > > with
>> > > >>> > based on the incremental calculation of CDC.
>> > > >>> >
>> > > >>> > Of course, the expression of incremental calculation on various
>> > > >>> occasions
>> > > >>> > is sometimes not very clear. Maybe we will discuss it more
>> clearly
>> > in
>> > > >>> > specific scenarios.
>> > > >>> >
>> > > >>> > >> If HUDI can keep and propagate these change flags to its
>> > > consumers,
>> > > >>> we
>> > > >>> > can
>> > > >>> > use HUDI as the unified format for the pipeline.
>> > > >>> >
>> > > >>> > Regarding the "change flags" here, do you mean the flags like
>> the
>> > one
>> > > >>> > shown in the figure below?
>> > > >>> >
>> > > >>> > [image: image.png]
>> > > >>> >
>> > > >>> > Best,
>> > > >>> > Vino
>> > > >>> >
>> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
>> > > >>> >
>> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about
>> using
>> > > >>> HUDI as
>> > > >>> >> the unified storage/format for data warehouse/lake incremental
>> > > >>> >> computation.
>> > > >>> >>
>> > > >>> >> Usually people divide data warehouse production into several
>> > levels,
>> > > >>> such
>> > > >>> >> as the ODS(operation data store), DWD(data warehouse details),
>> > > >>> DWS(data
>> > > >>> >> warehouse service), ADS(application data service).
>> > > >>> >>
>> > > >>> >>
>> > > >>> >> ODS -> DWD -> DWS -> ADS
>> > > >>> >>
>> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a
>> big
>> > > >>> topic is
>> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS
>> into
>> > the
>> > > >>> >> warehouse/lake, the cdc patten records and propagate the change
>> > > flag:
>> > > >>> >> insert, update(before and after) and delete for the consumer,
>> with
>> > > >>> these
>> > > >>> >> flags, the downstream engines can have a realtime accumulation
>> > > >>> >> computation.
>> > > >>> >>
>> > > >>> >> Using streaming engine like Flink, we can have a totally
>> > > >>> NEAR-REAL-TIME
>> > > >>> >> computation pipeline for each of the layer.
>> > > >>> >>
>> > > >>> >> If HUDI can keep and propagate these change flags to its
>> > consumers,
>> > > >>> we can
>> > > >>> >> use HUDI as the unified format for the pipeline.
>> > > >>> >>
>> > > >>> >> I'm expecting your nice ideas here ~
>> > > >>> >>
>> > > >>> >> Best,
>> > > >>> >> Danny Chan
>> > > >>> >>
>> > > >>> >
>> > > >>>
>> > > >>
>> > >
>> >
>>
>
>
> --
> Regards,
> -Sivabalan
>


-- 
Regards,
-Sivabalan

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Sivabalan <n....@gmail.com>.
Just got a chance to read about dynamic tables. sounds interesting.

some thoughts on your questions:
- yes, just MOR makes sense.
- But adding this new meta column only to avro logs might incur some non
trivial changes. Since as of today, schema of avro and base files are in
sync. If this new col is going to store just 2 bits(insert/update/delete),
we might as well add it to base files as well and keep it simple. But we
can make it configurable so that only those interested can enable this new
column to hudi dataset.
- Wondering, even today we can achieve this by using a transformer, to set
right values to this new column. I mean, users need to add this col in
their schema when defining the hudi dataset and if the incoming data has
right values for this col (using deltastreamer's transformer or by explicit
means), we don't even need to add this as a meta column. Just saying that
we can achieve this even today. But if we are looking to integrate w/ SQL
DML, then adding this support would be elegant.



On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <da...@apache.org> wrote:

> Thanks Vinoth ~
>
> Here is a document about the notion of 《Flink Dynamic Table》[1] , every
> operator that has accumulate state can handle retractions(UPDATE_BEFORE or
> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
> operator can consume the CDC format messages in streaming way.
>
> > Another aspect to think about is, how the new flag can be added to
> existing
> tables and if the schema evolution would be fine.
>
> That is also my concern, but it's not that bad because adding a new column
> is still compatible for old schema in Avro.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
>
> Best,
> Danny Chan
>
> Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:
>
> > Hi,
> >
> > Is the intent of the flag to convey if an insert delete or update changed
> > the record? If so I would imagine that we do this even for cow tables,
> > since that also supports a logical notion of a change stream using the
> > commit_time meta field.
> >
> > You may be right, but I am trying to understand the use case for this.
> Any
> > links/flink docs I can read?
> >
> > Another aspect to think about is, how the new flag can be added to
> existing
> > tables and if the schema evolution would be fine.
> >
> > Thanks
> > Vinoth
> >
> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org> wrote:
> >
> > > I tries to do a POC for flink locally and it works well, in the PR i
> add
> > a
> > > new metadata column named "_hoodie_change_flag", but actually i found
> > that
> > > only log format needs this flag, and the Spark may has no ability to
> > handle
> > > the flag for incremental processing yet.
> > >
> > > So should i add the "_hoodie_change_flag" metadata column, or is there
> > any
> > > better solution for this?
> > >
> > > Best,
> > > Danny Chan
> > >
> > > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> > >
> > > > Thanks cool, then the left questions are:
> > > >
> > > > - where we record these change, should we add a builtin meta field
> such
> > > as
> > > > the _change_flag_ like the other system columns for e.g
> > > _hoodie_commit_time
> > > > - what kind of table should keep these flags, in my thoughts, we
> should
> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO
> logs
> > > > - we should add a config there to switch on/off the flags in system
> > meta
> > > > fields
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Danny Chan
> > > >
> > > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> > > >
> > > >> >> Oops, the image crushes, for "change flags", i mean: insert,
> > > >> update(before
> > > >> and after) and delete.
> > > >>
> > > >> Yes, the image I attached is also about these flags.
> > > >> [image: image (3).png]
> > > >>
> > > >> +1 for the idea.
> > > >>
> > > >> Best,
> > > >> Vino
> > > >>
> > > >>
> > > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> > > >>
> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
> > > >>> update(before
> > > >>> and after) and delete.
> > > >>>
> > > >>> The Flink engine can propagate the change flags internally between
> > its
> > > >>> operators, if HUDI can send the change flags to Flink, the
> > incremental
> > > >>> calculation of CDC would be very natural (almost transparent to
> > users).
> > > >>>
> > > >>> Best,
> > > >>> Danny Chan
> > > >>>
> > > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> > > >>>
> > > >>> > Hi Danny,
> > > >>> >
> > > >>> > Thanks for kicking off this discussion thread.
> > > >>> >
> > > >>> > Yes, incremental query( or says "incremental processing") has
> > always
> > > >>> been
> > > >>> > an important feature of the Hudi framework. If we can make this
> > > feature
> > > >>> > better, it will be even more exciting.
> > > >>> >
> > > >>> > In the data warehouse, in some complex calculations, I have not
> > > found a
> > > >>> > good way to conveniently use some incremental change data
> (similar
> > to
> > > >>> the
> > > >>> > concept of retracement stream in Flink?) to locally "correct" the
> > > >>> > aggregation result (these aggregation results may belong to the
> DWS
> > > >>> layer).
> > > >>> >
> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
> (single
> > > >>> table
> > > >>> > or an algorithm that can be very easily retracement) can be dealt
> > > with
> > > >>> > based on the incremental calculation of CDC.
> > > >>> >
> > > >>> > Of course, the expression of incremental calculation on various
> > > >>> occasions
> > > >>> > is sometimes not very clear. Maybe we will discuss it more
> clearly
> > in
> > > >>> > specific scenarios.
> > > >>> >
> > > >>> > >> If HUDI can keep and propagate these change flags to its
> > > consumers,
> > > >>> we
> > > >>> > can
> > > >>> > use HUDI as the unified format for the pipeline.
> > > >>> >
> > > >>> > Regarding the "change flags" here, do you mean the flags like the
> > one
> > > >>> > shown in the figure below?
> > > >>> >
> > > >>> > [image: image.png]
> > > >>> >
> > > >>> > Best,
> > > >>> > Vino
> > > >>> >
> > > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> > > >>> >
> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about
> using
> > > >>> HUDI as
> > > >>> >> the unified storage/format for data warehouse/lake incremental
> > > >>> >> computation.
> > > >>> >>
> > > >>> >> Usually people divide data warehouse production into several
> > levels,
> > > >>> such
> > > >>> >> as the ODS(operation data store), DWD(data warehouse details),
> > > >>> DWS(data
> > > >>> >> warehouse service), ADS(application data service).
> > > >>> >>
> > > >>> >>
> > > >>> >> ODS -> DWD -> DWS -> ADS
> > > >>> >>
> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a
> big
> > > >>> topic is
> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS into
> > the
> > > >>> >> warehouse/lake, the cdc patten records and propagate the change
> > > flag:
> > > >>> >> insert, update(before and after) and delete for the consumer,
> with
> > > >>> these
> > > >>> >> flags, the downstream engines can have a realtime accumulation
> > > >>> >> computation.
> > > >>> >>
> > > >>> >> Using streaming engine like Flink, we can have a totally
> > > >>> NEAR-REAL-TIME
> > > >>> >> computation pipeline for each of the layer.
> > > >>> >>
> > > >>> >> If HUDI can keep and propagate these change flags to its
> > consumers,
> > > >>> we can
> > > >>> >> use HUDI as the unified format for the pipeline.
> > > >>> >>
> > > >>> >> I'm expecting your nice ideas here ~
> > > >>> >>
> > > >>> >> Best,
> > > >>> >> Danny Chan
> > > >>> >>
> > > >>> >
> > > >>>
> > > >>
> > >
> >
>


-- 
Regards,
-Sivabalan

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
Thanks Vinoth ~

Here is a document about the notion of 《Flink Dynamic Table》[1] , every
operator that has accumulate state can handle retractions(UPDATE_BEFORE or
DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
operator can consume the CDC format messages in streaming way.

> Another aspect to think about is, how the new flag can be added to
existing
tables and if the schema evolution would be fine.

That is also my concern, but it's not that bad because adding a new column
is still compatible for old schema in Avro.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html

Best,
Danny Chan

Vinoth Chandar <vi...@apache.org> 于2021年4月16日周五 上午9:44写道:

> Hi,
>
> Is the intent of the flag to convey if an insert delete or update changed
> the record? If so I would imagine that we do this even for cow tables,
> since that also supports a logical notion of a change stream using the
> commit_time meta field.
>
> You may be right, but I am trying to understand the use case for this. Any
> links/flink docs I can read?
>
> Another aspect to think about is, how the new flag can be added to existing
> tables and if the schema evolution would be fine.
>
> Thanks
> Vinoth
>
> On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org> wrote:
>
> > I tries to do a POC for flink locally and it works well, in the PR i add
> a
> > new metadata column named "_hoodie_change_flag", but actually i found
> that
> > only log format needs this flag, and the Spark may has no ability to
> handle
> > the flag for incremental processing yet.
> >
> > So should i add the "_hoodie_change_flag" metadata column, or is there
> any
> > better solution for this?
> >
> > Best,
> > Danny Chan
> >
> > Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
> >
> > > Thanks cool, then the left questions are:
> > >
> > > - where we record these change, should we add a builtin meta field such
> > as
> > > the _change_flag_ like the other system columns for e.g
> > _hoodie_commit_time
> > > - what kind of table should keep these flags, in my thoughts, we should
> > > only add these flags for "MERGE_ON_READ" table, and only for AVRO logs
> > > - we should add a config there to switch on/off the flags in system
> meta
> > > fields
> > >
> > > What do you think?
> > >
> > > Best,
> > > Danny Chan
> > >
> > > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> > >
> > >> >> Oops, the image crushes, for "change flags", i mean: insert,
> > >> update(before
> > >> and after) and delete.
> > >>
> > >> Yes, the image I attached is also about these flags.
> > >> [image: image (3).png]
> > >>
> > >> +1 for the idea.
> > >>
> > >> Best,
> > >> Vino
> > >>
> > >>
> > >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> > >>
> > >>> Oops, the image crushes, for "change flags", i mean: insert,
> > >>> update(before
> > >>> and after) and delete.
> > >>>
> > >>> The Flink engine can propagate the change flags internally between
> its
> > >>> operators, if HUDI can send the change flags to Flink, the
> incremental
> > >>> calculation of CDC would be very natural (almost transparent to
> users).
> > >>>
> > >>> Best,
> > >>> Danny Chan
> > >>>
> > >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> > >>>
> > >>> > Hi Danny,
> > >>> >
> > >>> > Thanks for kicking off this discussion thread.
> > >>> >
> > >>> > Yes, incremental query( or says "incremental processing") has
> always
> > >>> been
> > >>> > an important feature of the Hudi framework. If we can make this
> > feature
> > >>> > better, it will be even more exciting.
> > >>> >
> > >>> > In the data warehouse, in some complex calculations, I have not
> > found a
> > >>> > good way to conveniently use some incremental change data (similar
> to
> > >>> the
> > >>> > concept of retracement stream in Flink?) to locally "correct" the
> > >>> > aggregation result (these aggregation results may belong to the DWS
> > >>> layer).
> > >>> >
> > >>> > BTW: Yes, I do admit that some simple calculation scenarios (single
> > >>> table
> > >>> > or an algorithm that can be very easily retracement) can be dealt
> > with
> > >>> > based on the incremental calculation of CDC.
> > >>> >
> > >>> > Of course, the expression of incremental calculation on various
> > >>> occasions
> > >>> > is sometimes not very clear. Maybe we will discuss it more clearly
> in
> > >>> > specific scenarios.
> > >>> >
> > >>> > >> If HUDI can keep and propagate these change flags to its
> > consumers,
> > >>> we
> > >>> > can
> > >>> > use HUDI as the unified format for the pipeline.
> > >>> >
> > >>> > Regarding the "change flags" here, do you mean the flags like the
> one
> > >>> > shown in the figure below?
> > >>> >
> > >>> > [image: image.png]
> > >>> >
> > >>> > Best,
> > >>> > Vino
> > >>> >
> > >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> > >>> >
> > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about using
> > >>> HUDI as
> > >>> >> the unified storage/format for data warehouse/lake incremental
> > >>> >> computation.
> > >>> >>
> > >>> >> Usually people divide data warehouse production into several
> levels,
> > >>> such
> > >>> >> as the ODS(operation data store), DWD(data warehouse details),
> > >>> DWS(data
> > >>> >> warehouse service), ADS(application data service).
> > >>> >>
> > >>> >>
> > >>> >> ODS -> DWD -> DWS -> ADS
> > >>> >>
> > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big
> > >>> topic is
> > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS into
> the
> > >>> >> warehouse/lake, the cdc patten records and propagate the change
> > flag:
> > >>> >> insert, update(before and after) and delete for the consumer, with
> > >>> these
> > >>> >> flags, the downstream engines can have a realtime accumulation
> > >>> >> computation.
> > >>> >>
> > >>> >> Using streaming engine like Flink, we can have a totally
> > >>> NEAR-REAL-TIME
> > >>> >> computation pipeline for each of the layer.
> > >>> >>
> > >>> >> If HUDI can keep and propagate these change flags to its
> consumers,
> > >>> we can
> > >>> >> use HUDI as the unified format for the pipeline.
> > >>> >>
> > >>> >> I'm expecting your nice ideas here ~
> > >>> >>
> > >>> >> Best,
> > >>> >> Danny Chan
> > >>> >>
> > >>> >
> > >>>
> > >>
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Vinoth Chandar <vi...@apache.org>.
Hi,

Is the intent of the flag to convey if an insert delete or update changed
the record? If so I would imagine that we do this even for cow tables,
since that also supports a logical notion of a change stream using the
commit_time meta field.

You may be right, but I am trying to understand the use case for this. Any
links/flink docs I can read?

Another aspect to think about is, how the new flag can be added to existing
tables and if the schema evolution would be fine.

Thanks
Vinoth

On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <da...@apache.org> wrote:

> I tries to do a POC for flink locally and it works well, in the PR i add a
> new metadata column named "_hoodie_change_flag", but actually i found that
> only log format needs this flag, and the Spark may has no ability to handle
> the flag for incremental processing yet.
>
> So should i add the "_hoodie_change_flag" metadata column, or is there any
> better solution for this?
>
> Best,
> Danny Chan
>
> Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:
>
> > Thanks cool, then the left questions are:
> >
> > - where we record these change, should we add a builtin meta field such
> as
> > the _change_flag_ like the other system columns for e.g
> _hoodie_commit_time
> > - what kind of table should keep these flags, in my thoughts, we should
> > only add these flags for "MERGE_ON_READ" table, and only for AVRO logs
> > - we should add a config there to switch on/off the flags in system meta
> > fields
> >
> > What do you think?
> >
> > Best,
> > Danny Chan
> >
> > vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> >
> >> >> Oops, the image crushes, for "change flags", i mean: insert,
> >> update(before
> >> and after) and delete.
> >>
> >> Yes, the image I attached is also about these flags.
> >> [image: image (3).png]
> >>
> >> +1 for the idea.
> >>
> >> Best,
> >> Vino
> >>
> >>
> >> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
> >>
> >>> Oops, the image crushes, for "change flags", i mean: insert,
> >>> update(before
> >>> and after) and delete.
> >>>
> >>> The Flink engine can propagate the change flags internally between its
> >>> operators, if HUDI can send the change flags to Flink, the incremental
> >>> calculation of CDC would be very natural (almost transparent to users).
> >>>
> >>> Best,
> >>> Danny Chan
> >>>
> >>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> >>>
> >>> > Hi Danny,
> >>> >
> >>> > Thanks for kicking off this discussion thread.
> >>> >
> >>> > Yes, incremental query( or says "incremental processing") has always
> >>> been
> >>> > an important feature of the Hudi framework. If we can make this
> feature
> >>> > better, it will be even more exciting.
> >>> >
> >>> > In the data warehouse, in some complex calculations, I have not
> found a
> >>> > good way to conveniently use some incremental change data (similar to
> >>> the
> >>> > concept of retracement stream in Flink?) to locally "correct" the
> >>> > aggregation result (these aggregation results may belong to the DWS
> >>> layer).
> >>> >
> >>> > BTW: Yes, I do admit that some simple calculation scenarios (single
> >>> table
> >>> > or an algorithm that can be very easily retracement) can be dealt
> with
> >>> > based on the incremental calculation of CDC.
> >>> >
> >>> > Of course, the expression of incremental calculation on various
> >>> occasions
> >>> > is sometimes not very clear. Maybe we will discuss it more clearly in
> >>> > specific scenarios.
> >>> >
> >>> > >> If HUDI can keep and propagate these change flags to its
> consumers,
> >>> we
> >>> > can
> >>> > use HUDI as the unified format for the pipeline.
> >>> >
> >>> > Regarding the "change flags" here, do you mean the flags like the one
> >>> > shown in the figure below?
> >>> >
> >>> > [image: image.png]
> >>> >
> >>> > Best,
> >>> > Vino
> >>> >
> >>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> >>> >
> >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about using
> >>> HUDI as
> >>> >> the unified storage/format for data warehouse/lake incremental
> >>> >> computation.
> >>> >>
> >>> >> Usually people divide data warehouse production into several levels,
> >>> such
> >>> >> as the ODS(operation data store), DWD(data warehouse details),
> >>> DWS(data
> >>> >> warehouse service), ADS(application data service).
> >>> >>
> >>> >>
> >>> >> ODS -> DWD -> DWS -> ADS
> >>> >>
> >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big
> >>> topic is
> >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS into the
> >>> >> warehouse/lake, the cdc patten records and propagate the change
> flag:
> >>> >> insert, update(before and after) and delete for the consumer, with
> >>> these
> >>> >> flags, the downstream engines can have a realtime accumulation
> >>> >> computation.
> >>> >>
> >>> >> Using streaming engine like Flink, we can have a totally
> >>> NEAR-REAL-TIME
> >>> >> computation pipeline for each of the layer.
> >>> >>
> >>> >> If HUDI can keep and propagate these change flags to its consumers,
> >>> we can
> >>> >> use HUDI as the unified format for the pipeline.
> >>> >>
> >>> >> I'm expecting your nice ideas here ~
> >>> >>
> >>> >> Best,
> >>> >> Danny Chan
> >>> >>
> >>> >
> >>>
> >>
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
I tries to do a POC for flink locally and it works well, in the PR i add a
new metadata column named "_hoodie_change_flag", but actually i found that
only log format needs this flag, and the Spark may has no ability to handle
the flag for incremental processing yet.

So should i add the "_hoodie_change_flag" metadata column, or is there any
better solution for this?

Best,
Danny Chan

Danny Chan <da...@apache.org> 于2021年4月2日周五 上午11:08写道:

> Thanks cool, then the left questions are:
>
> - where we record these change, should we add a builtin meta field such as
> the _change_flag_ like the other system columns for e.g _hoodie_commit_time
> - what kind of table should keep these flags, in my thoughts, we should
> only add these flags for "MERGE_ON_READ" table, and only for AVRO logs
> - we should add a config there to switch on/off the flags in system meta
> fields
>
> What do you think?
>
> Best,
> Danny Chan
>
> vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:
>
>> >> Oops, the image crushes, for "change flags", i mean: insert,
>> update(before
>> and after) and delete.
>>
>> Yes, the image I attached is also about these flags.
>> [image: image (3).png]
>>
>> +1 for the idea.
>>
>> Best,
>> Vino
>>
>>
>> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
>>
>>> Oops, the image crushes, for "change flags", i mean: insert,
>>> update(before
>>> and after) and delete.
>>>
>>> The Flink engine can propagate the change flags internally between its
>>> operators, if HUDI can send the change flags to Flink, the incremental
>>> calculation of CDC would be very natural (almost transparent to users).
>>>
>>> Best,
>>> Danny Chan
>>>
>>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>>>
>>> > Hi Danny,
>>> >
>>> > Thanks for kicking off this discussion thread.
>>> >
>>> > Yes, incremental query( or says "incremental processing") has always
>>> been
>>> > an important feature of the Hudi framework. If we can make this feature
>>> > better, it will be even more exciting.
>>> >
>>> > In the data warehouse, in some complex calculations, I have not found a
>>> > good way to conveniently use some incremental change data (similar to
>>> the
>>> > concept of retracement stream in Flink?) to locally "correct" the
>>> > aggregation result (these aggregation results may belong to the DWS
>>> layer).
>>> >
>>> > BTW: Yes, I do admit that some simple calculation scenarios (single
>>> table
>>> > or an algorithm that can be very easily retracement) can be dealt with
>>> > based on the incremental calculation of CDC.
>>> >
>>> > Of course, the expression of incremental calculation on various
>>> occasions
>>> > is sometimes not very clear. Maybe we will discuss it more clearly in
>>> > specific scenarios.
>>> >
>>> > >> If HUDI can keep and propagate these change flags to its consumers,
>>> we
>>> > can
>>> > use HUDI as the unified format for the pipeline.
>>> >
>>> > Regarding the "change flags" here, do you mean the flags like the one
>>> > shown in the figure below?
>>> >
>>> > [image: image.png]
>>> >
>>> > Best,
>>> > Vino
>>> >
>>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
>>> >
>>> >> Hi dear HUDI community ~ Here i want to fire a discuss about using
>>> HUDI as
>>> >> the unified storage/format for data warehouse/lake incremental
>>> >> computation.
>>> >>
>>> >> Usually people divide data warehouse production into several levels,
>>> such
>>> >> as the ODS(operation data store), DWD(data warehouse details),
>>> DWS(data
>>> >> warehouse service), ADS(application data service).
>>> >>
>>> >>
>>> >> ODS -> DWD -> DWS -> ADS
>>> >>
>>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big
>>> topic is
>>> >> syncing the change log(CDC pattern) from all kinds of RDBMS into the
>>> >> warehouse/lake, the cdc patten records and propagate the change flag:
>>> >> insert, update(before and after) and delete for the consumer, with
>>> these
>>> >> flags, the downstream engines can have a realtime accumulation
>>> >> computation.
>>> >>
>>> >> Using streaming engine like Flink, we can have a totally
>>> NEAR-REAL-TIME
>>> >> computation pipeline for each of the layer.
>>> >>
>>> >> If HUDI can keep and propagate these change flags to its consumers,
>>> we can
>>> >> use HUDI as the unified format for the pipeline.
>>> >>
>>> >> I'm expecting your nice ideas here ~
>>> >>
>>> >> Best,
>>> >> Danny Chan
>>> >>
>>> >
>>>
>>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
Thanks cool, then the left questions are:

- where we record these change, should we add a builtin meta field such as
the _change_flag_ like the other system columns for e.g _hoodie_commit_time
- what kind of table should keep these flags, in my thoughts, we should
only add these flags for "MERGE_ON_READ" table, and only for AVRO logs
- we should add a config there to switch on/off the flags in system meta
fields

What do you think?

Best,
Danny Chan

vino yang <ya...@gmail.com> 于2021年4月1日周四 上午10:58写道:

> >> Oops, the image crushes, for "change flags", i mean: insert,
> update(before
> and after) and delete.
>
> Yes, the image I attached is also about these flags.
> [image: image (3).png]
>
> +1 for the idea.
>
> Best,
> Vino
>
>
> Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:
>
>> Oops, the image crushes, for "change flags", i mean: insert, update(before
>> and after) and delete.
>>
>> The Flink engine can propagate the change flags internally between its
>> operators, if HUDI can send the change flags to Flink, the incremental
>> calculation of CDC would be very natural (almost transparent to users).
>>
>> Best,
>> Danny Chan
>>
>> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>>
>> > Hi Danny,
>> >
>> > Thanks for kicking off this discussion thread.
>> >
>> > Yes, incremental query( or says "incremental processing") has always
>> been
>> > an important feature of the Hudi framework. If we can make this feature
>> > better, it will be even more exciting.
>> >
>> > In the data warehouse, in some complex calculations, I have not found a
>> > good way to conveniently use some incremental change data (similar to
>> the
>> > concept of retracement stream in Flink?) to locally "correct" the
>> > aggregation result (these aggregation results may belong to the DWS
>> layer).
>> >
>> > BTW: Yes, I do admit that some simple calculation scenarios (single
>> table
>> > or an algorithm that can be very easily retracement) can be dealt with
>> > based on the incremental calculation of CDC.
>> >
>> > Of course, the expression of incremental calculation on various
>> occasions
>> > is sometimes not very clear. Maybe we will discuss it more clearly in
>> > specific scenarios.
>> >
>> > >> If HUDI can keep and propagate these change flags to its consumers,
>> we
>> > can
>> > use HUDI as the unified format for the pipeline.
>> >
>> > Regarding the "change flags" here, do you mean the flags like the one
>> > shown in the figure below?
>> >
>> > [image: image.png]
>> >
>> > Best,
>> > Vino
>> >
>> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
>> >
>> >> Hi dear HUDI community ~ Here i want to fire a discuss about using
>> HUDI as
>> >> the unified storage/format for data warehouse/lake incremental
>> >> computation.
>> >>
>> >> Usually people divide data warehouse production into several levels,
>> such
>> >> as the ODS(operation data store), DWD(data warehouse details), DWS(data
>> >> warehouse service), ADS(application data service).
>> >>
>> >>
>> >> ODS -> DWD -> DWS -> ADS
>> >>
>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big
>> topic is
>> >> syncing the change log(CDC pattern) from all kinds of RDBMS into the
>> >> warehouse/lake, the cdc patten records and propagate the change flag:
>> >> insert, update(before and after) and delete for the consumer, with
>> these
>> >> flags, the downstream engines can have a realtime accumulation
>> >> computation.
>> >>
>> >> Using streaming engine like Flink, we can have a totally NEAR-REAL-TIME
>> >> computation pipeline for each of the layer.
>> >>
>> >> If HUDI can keep and propagate these change flags to its consumers, we
>> can
>> >> use HUDI as the unified format for the pipeline.
>> >>
>> >> I'm expecting your nice ideas here ~
>> >>
>> >> Best,
>> >> Danny Chan
>> >>
>> >
>>
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by vino yang <ya...@gmail.com>.
>> Oops, the image crushes, for "change flags", i mean: insert,
update(before
and after) and delete.

Yes, the image I attached is also about these flags.
[image: image (3).png]

+1 for the idea.

Best,
Vino


Danny Chan <da...@apache.org> 于2021年4月1日周四 上午10:03写道:

> Oops, the image crushes, for "change flags", i mean: insert, update(before
> and after) and delete.
>
> The Flink engine can propagate the change flags internally between its
> operators, if HUDI can send the change flags to Flink, the incremental
> calculation of CDC would be very natural (almost transparent to users).
>
> Best,
> Danny Chan
>
> vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>
> > Hi Danny,
> >
> > Thanks for kicking off this discussion thread.
> >
> > Yes, incremental query( or says "incremental processing") has always been
> > an important feature of the Hudi framework. If we can make this feature
> > better, it will be even more exciting.
> >
> > In the data warehouse, in some complex calculations, I have not found a
> > good way to conveniently use some incremental change data (similar to the
> > concept of retracement stream in Flink?) to locally "correct" the
> > aggregation result (these aggregation results may belong to the DWS
> layer).
> >
> > BTW: Yes, I do admit that some simple calculation scenarios (single table
> > or an algorithm that can be very easily retracement) can be dealt with
> > based on the incremental calculation of CDC.
> >
> > Of course, the expression of incremental calculation on various occasions
> > is sometimes not very clear. Maybe we will discuss it more clearly in
> > specific scenarios.
> >
> > >> If HUDI can keep and propagate these change flags to its consumers, we
> > can
> > use HUDI as the unified format for the pipeline.
> >
> > Regarding the "change flags" here, do you mean the flags like the one
> > shown in the figure below?
> >
> > [image: image.png]
> >
> > Best,
> > Vino
> >
> > Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
> >
> >> Hi dear HUDI community ~ Here i want to fire a discuss about using HUDI
> as
> >> the unified storage/format for data warehouse/lake incremental
> >> computation.
> >>
> >> Usually people divide data warehouse production into several levels,
> such
> >> as the ODS(operation data store), DWD(data warehouse details), DWS(data
> >> warehouse service), ADS(application data service).
> >>
> >>
> >> ODS -> DWD -> DWS -> ADS
> >>
> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big topic
> is
> >> syncing the change log(CDC pattern) from all kinds of RDBMS into the
> >> warehouse/lake, the cdc patten records and propagate the change flag:
> >> insert, update(before and after) and delete for the consumer, with these
> >> flags, the downstream engines can have a realtime accumulation
> >> computation.
> >>
> >> Using streaming engine like Flink, we can have a totally NEAR-REAL-TIME
> >> computation pipeline for each of the layer.
> >>
> >> If HUDI can keep and propagate these change flags to its consumers, we
> can
> >> use HUDI as the unified format for the pipeline.
> >>
> >> I'm expecting your nice ideas here ~
> >>
> >> Best,
> >> Danny Chan
> >>
> >
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by Danny Chan <da...@apache.org>.
Oops, the image crushes, for "change flags", i mean: insert, update(before
and after) and delete.

The Flink engine can propagate the change flags internally between its
operators, if HUDI can send the change flags to Flink, the incremental
calculation of CDC would be very natural (almost transparent to users).

Best,
Danny Chan

vino yang <ya...@gmail.com> 于2021年3月31日周三 下午11:32写道:

> Hi Danny,
>
> Thanks for kicking off this discussion thread.
>
> Yes, incremental query( or says "incremental processing") has always been
> an important feature of the Hudi framework. If we can make this feature
> better, it will be even more exciting.
>
> In the data warehouse, in some complex calculations, I have not found a
> good way to conveniently use some incremental change data (similar to the
> concept of retracement stream in Flink?) to locally "correct" the
> aggregation result (these aggregation results may belong to the DWS layer).
>
> BTW: Yes, I do admit that some simple calculation scenarios (single table
> or an algorithm that can be very easily retracement) can be dealt with
> based on the incremental calculation of CDC.
>
> Of course, the expression of incremental calculation on various occasions
> is sometimes not very clear. Maybe we will discuss it more clearly in
> specific scenarios.
>
> >> If HUDI can keep and propagate these change flags to its consumers, we
> can
> use HUDI as the unified format for the pipeline.
>
> Regarding the "change flags" here, do you mean the flags like the one
> shown in the figure below?
>
> [image: image.png]
>
> Best,
> Vino
>
> Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:
>
>> Hi dear HUDI community ~ Here i want to fire a discuss about using HUDI as
>> the unified storage/format for data warehouse/lake incremental
>> computation.
>>
>> Usually people divide data warehouse production into several levels, such
>> as the ODS(operation data store), DWD(data warehouse details), DWS(data
>> warehouse service), ADS(application data service).
>>
>>
>> ODS -> DWD -> DWS -> ADS
>>
>> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big topic is
>> syncing the change log(CDC pattern) from all kinds of RDBMS into the
>> warehouse/lake, the cdc patten records and propagate the change flag:
>> insert, update(before and after) and delete for the consumer, with these
>> flags, the downstream engines can have a realtime accumulation
>> computation.
>>
>> Using streaming engine like Flink, we can have a totally NEAR-REAL-TIME
>> computation pipeline for each of the layer.
>>
>> If HUDI can keep and propagate these change flags to its consumers, we can
>> use HUDI as the unified format for the pipeline.
>>
>> I'm expecting your nice ideas here ~
>>
>> Best,
>> Danny Chan
>>
>

Re: [DISCUSS] Incremental computation pipeline for HUDI

Posted by vino yang <ya...@gmail.com>.
Hi Danny,

Thanks for kicking off this discussion thread.

Yes, incremental query( or says "incremental processing") has always been
an important feature of the Hudi framework. If we can make this feature
better, it will be even more exciting.

In the data warehouse, in some complex calculations, I have not found a
good way to conveniently use some incremental change data (similar to the
concept of retracement stream in Flink?) to locally "correct" the
aggregation result (these aggregation results may belong to the DWS layer).

BTW: Yes, I do admit that some simple calculation scenarios (single table
or an algorithm that can be very easily retracement) can be dealt with
based on the incremental calculation of CDC.

Of course, the expression of incremental calculation on various occasions
is sometimes not very clear. Maybe we will discuss it more clearly in
specific scenarios.

>> If HUDI can keep and propagate these change flags to its consumers, we
can
use HUDI as the unified format for the pipeline.

Regarding the "change flags" here, do you mean the flags like the one shown
in the figure below?

[image: image.png]

Best,
Vino

Danny Chan <da...@apache.org> 于2021年3月31日周三 下午6:24写道:

> Hi dear HUDI community ~ Here i want to fire a discuss about using HUDI as
> the unified storage/format for data warehouse/lake incremental computation.
>
> Usually people divide data warehouse production into several levels, such
> as the ODS(operation data store), DWD(data warehouse details), DWS(data
> warehouse service), ADS(application data service).
>
>
> ODS -> DWD -> DWS -> ADS
>
> In the NEAR-REAL-TIME (or pure realtime) computation cases, a big topic is
> syncing the change log(CDC pattern) from all kinds of RDBMS into the
> warehouse/lake, the cdc patten records and propagate the change flag:
> insert, update(before and after) and delete for the consumer, with these
> flags, the downstream engines can have a realtime accumulation computation.
>
> Using streaming engine like Flink, we can have a totally NEAR-REAL-TIME
> computation pipeline for each of the layer.
>
> If HUDI can keep and propagate these change flags to its consumers, we can
> use HUDI as the unified format for the pipeline.
>
> I'm expecting your nice ideas here ~
>
> Best,
> Danny Chan
>