You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by Vinoth Chandar <vi...@apache.org> on 2021/02/10 01:42:31 UTC

Re: [DISCUSS] Improve data locality during ingestion

Hi,

We already support a user defined custom partitioner for bulk insert.
So you can actually control it whichever way you like, for the
initial load.

Thanks
Vinoth

On Tue, Feb 9, 2021 at 5:36 PM Rubens Rodrigues <ru...@gmail.com>
wrote:

> Hi guys,
>
> Talking about my use case...
>
> I have datasets that ordering data by date makes a lot sense or ordering by
> some id to have less touched files on merge operations.
> On my use of delta lake I used to bootstrap tables ever ordering by one of
> these fields and helps a lot on file pruning.
>
> Hudi clustering do this job but I think it is an unnecessary extra step to
> do after bulk insert because all data will need to be rewrite again.
>
>
>
> Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <vi...@apache.org>
> escreveu:
>
> > Hi Satish,
> >
> > Been to respond to this. I think I like the idea overall.
> >
> > Here's a (hopefully) my understanding version and let me know if I am
> > getting this right.
> >
> > Predominantly, we are just talking about the problem of: where do we send
> > the "inserts" to.
> >
> > Today the upsert partitioner does the file sizing/bin-packing etc for
> > inserts and then sends some inserts over to existing file groups to
> > maintain file size.
> > We can abstract all of this into strategies and some kind of pipeline
> > abstractions and have it also consider "affinity" to an existing file
> group
> > based
> > on say information stored in the metadata table?
> >
> > I think this is complimentary to what we do today and can be helpful.
> First
> > thing may be is to abstract the existing write pipeline as a series of
> > "optimization"
> > stages and bring things like file sizing under that.
> >
> > On bucketing, I am not against Hive bucketing or anything. But with
> record
> > level indexes and granular/micro partitions that we can achieve using
> > clustering, is it still the most efficient design? That's a question I
> > would love to find answers for. I never liked the static/hash
> partitioning
> > based schemes
> > in bucketing. they introduce  a lot of manual data munging, if things
> > change.
> >
> > Thanks
> > Vinoth
> >
> >
> >
> > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha <satishkotha@uber.com.invalid
> >
> > wrote:
> >
> > > I got some feedback that this thread may be a bit complex to
> understand.
> > So
> > > I tried to simplify proposal to below:
> > >
> > > Users can already specify 'partitionpath' using this config
> > > <
> > >
> >
> https://hudi.apache.org/docs/configurations.html#PARTITIONPATH_FIELD_OPT_KEY
> > > >
> > > when
> > > writing data. My proposal is we also give users the ability to identify
> > (or
> > > hint at) 'fileId' to while writing the data. For example, users can
> > > say 'locality.columns:
> > > session_id'. We deterministically map every session_id to a specific
> > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So all
> > > values for a session_id are co-located in the same data/log file.
> > >
> > > Hopefully, this explains the idea better. Appreciate any feedback.
> > >
> > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <sa...@uber.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > Clustering <https://hudi.apache.org/blog/hudi-clustering-intro/> is
> a
> > > > great feature for improving data locality. But it has a (relatively
> > big)
> > > > cost to rewrite the data after ingestion. I think there are other
> ways
> > to
> > > > improve data locality during ingestion. For example, we can add a new
> > > Index
> > > > (or partitioner) that reads values for columns that are important
> from
> > a
> > > > data locality perspective. We could then compute hash modulo on the
> > value
> > > > and use that to deterministically identify the file group that the
> > record
> > > > has to be written into.
> > > >
> > > > More detailed example:
> > > > Assume we introduce 2 new config:
> > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total
> number
> > > of
> > > > file Ids allowed (per partition).
> > > >
> > > > hoodie.datasource.write.locality.columns: "session_id,timestamp"
> > > #Identify
> > > > columns that are important for data locality.
> > > >
> > > > (I can come up with better names for config if the general idea
> sounds
> > > > good).
> > > >
> > > > During ingestion, we generate 'N' fileIds for each partition (if that
> > > > partition has already K fileIds, we generate N-K new fileIds). Let's
> > say
> > > > these fileIds are stored in fileIdList data structure. For each row,
> we
> > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This
> value
> > > is
> > > > used as the index into fileIdList data structure to deterministically
> > > > identify the file group for the row.
> > > >
> > > > This improves data locality by ensuring columns with a given value
> are
> > > > stored in the same file. This hashing could be done in two places:
> > > > 1) A custom index that tags location for each row based on values for
> > > > 'session_id+timestamp'.
> > > > 2) In a new partitioner that assigns buckets for each row based of
> > values
> > > > for 'session_id+timestamp'
> > > >
> > > > *Advantages:*
> > > > 1) No need to rewrite data for improving data locality.
> > > > 2) Integrates well with hive bucketing (spark is also adding support
> > for
> > > > hive bucketing <https://issues.apache.org/jira/browse/SPARK-19256>)
> > > > 3) This reduces scan cycles to find a particular key because this
> > ensures
> > > > that the key is present in a certain fileId. Similarly, joining
> across
> > > > multiple tables would be efficient if they both choose the same
> > > > 'locality.columns'.
> > > >
> > > > *Disadvantages:*
> > > > 1) Users need to know the total number of filegroups to generate per
> > > > partition. This value is assumed to be static for all partitions. So
> if
> > > > significant changes are expected in traffic volume, this may not
> > > partition
> > > > the data well.  (We could also consider making this static per
> > partition,
> > > > which adds additional complexity, but feasible to do)
> > > > 2) This may not be as efficient as clustering. For example, data for
> a
> > > > given column value is guaranteed to be co-located in the same file.
> > But
> > > > they may not be in the same block (row group in parquet).  So more
> > blocks
> > > > need to be read by query engines.
> > > >
> > > >
> > > > Clustering can still be useful for other use cases such as stitching
> > > > files, transforming data for efficiency etc. Clustering can also be
> > > useful
> > > > for a few sorting scenarios - e.g., if users cannot predict a good
> > value
> > > > for the number of file groups needed.
> > > >
> > > > Appreciate any feedback. Let me know if you have other ideas on
> > improving
> > > > data locality. If you are interested in this idea and want to
> > > collaborate,
> > > > please reach out.
> > > >
> > > > Thanks
> > > > Satish
> > > >
> > >
> >
>