You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "vinothchandar (via GitHub)" <gi...@apache.org> on 2023/03/04 04:09:11 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #7907: [HUDI-5672][RFC-61] Lockless multi writer support

vinothchandar commented on code in PR #7907:
URL: https://github.com/apache/hudi/pull/7907#discussion_r1122538301


##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.

Review Comment:
   compaction or merge on read for queries.



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Determistic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs.
+
+#### Lazy Cleaning Strategy
+
+Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers.
+
+### Basic Work Flow
+
+#### Writing Log Files Separately In Sequence
+
+Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number,
+a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time,

Review Comment:
   Can't the write_token be the write commit_time? we can make collisions highly improbably using some suffix/hash if needed. 



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,

Review Comment:
   whatever we do, would be nice to generalize across indexes.



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Determistic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs.
+
+#### Lazy Cleaning Strategy
+
+Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers.
+
+### Basic Work Flow
+
+#### Writing Log Files Separately In Sequence
+
+Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number,
+a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time,
+so that the file name does not conflict for the writers.
+
+The log files generated by a single writer can still preserve the sequence by versioning number, which is important if the natual order is needed for single writer events.
+
+![multi-writer](multi_writer.png)
+
+#### The Compaction Procedure
+
+The compaction service is the duty role that actually resoves the conflicts. Within a file group, it sorts the files then merge all the record payloads for a record key.
+The event time sequence is respected by combining the payloads with even time field provided by the payload (known as the `preCombine` field in Hudi).

Review Comment:
   +1 . I wonder if we need arrival time based combining as well i.e instant time.



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Determistic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs.
+
+#### Lazy Cleaning Strategy
+
+Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers.
+
+### Basic Work Flow
+
+#### Writing Log Files Separately In Sequence
+
+Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number,
+a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time,
+so that the file name does not conflict for the writers.
+
+The log files generated by a single writer can still preserve the sequence by versioning number, which is important if the natual order is needed for single writer events.
+
+![multi-writer](multi_writer.png)
+
+#### The Compaction Procedure
+
+The compaction service is the duty role that actually resoves the conflicts. Within a file group, it sorts the files then merge all the record payloads for a record key.
+The event time sequence is respected by combining the payloads with even time field provided by the payload (known as the `preCombine` field in Hudi).
+
+![compaction procedure](compaction.png)
+
+### The Unique Ids for Different Writers
+
+In order to avoid file name conflicts for different writers, here we introduce a new write config option: `hoodie.write.client.id`,

Review Comment:
   can this be auto generated. Love to avoid another config for this. 



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Determistic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs.
+
+#### Lazy Cleaning Strategy
+
+Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers.
+
+### Basic Work Flow
+
+#### Writing Log Files Separately In Sequence
+
+Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number,
+a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time,
+so that the file name does not conflict for the writers.
+
+The log files generated by a single writer can still preserve the sequence by versioning number, which is important if the natual order is needed for single writer events.
+
+![multi-writer](multi_writer.png)
+
+#### The Compaction Procedure
+
+The compaction service is the duty role that actually resoves the conflicts. Within a file group, it sorts the files then merge all the record payloads for a record key.
+The event time sequence is respected by combining the payloads with even time field provided by the payload (known as the `preCombine` field in Hudi).
+
+![compaction procedure](compaction.png)
+
+### The Unique Ids for Different Writers
+
+In order to avoid file name conflicts for different writers, here we introduce a new write config option: `hoodie.write.client.id`,
+the option value is then part of the task write token, so that different task from the different writers always diff in file name.
+Finally, the log name is with the following pattern:
+
+```shell
+${uuid}_${instant}.log.${version}_${task_token}-${client_id}

Review Comment:
   again, won't the deltacommit time work - as `task_token`



##########
rfc/rfc-61/rfc-61.md:
##########
@@ -0,0 +1,98 @@
+# RFC-61: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs,
+so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority,
+that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not very capable of putting into production, in this RFC, we propse a lockless solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Determistic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs.

Review Comment:
   the insert problem can be handled with a more expensive conflict resolution strategy, that incrementally checks for PK violations. let me think about this more and sketch a solution if I can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org