You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "parisni (via GitHub)" <gi...@apache.org> on 2023/03/17 19:41:30 UTC

[GitHub] [hudi] parisni opened a new issue, #8222: [SUPPORT] Incremental read with MOR does not work as COW

parisni opened a new issue, #8222:
URL: https://github.com/apache/hudi/issues/8222

   tested on 0.12 and 0.13 with spark 3.2.1
   
   COW and MOR tables does not return same result when dealing with incremental read. Here is a reproducible source code + output for both.
   
   For the last commit,
   MOR returns:
   ```
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202718414|20230317202718414...|                 a|                   foo|20c4c9f3-2cbc-439...|   a|  9| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   ```
   while COW returns :
   ```
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   ```
   COW is right, while MOR is wrong, because the combine key for the last commit is lower than existing record and should not be shown.
   
   ```python
   tableName = "test_hudi_pyspark_local"
   basePath = f"/tmp/{tableName}"
   
   df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
   #mode="COPY_ON_WRITE"
   mode="MERGE_ON_READ"
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202715405|20230317202715405...|                 a|                   foo|20c4c9f3-2cbc-439...|   a| 10| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
   (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202717144|20230317202717144...|                 a|                   foo|20c4c9f3-2cbc-439...|   a| 11| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
   (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202717144|20230317202717144...|                 a|                   foo|20c4c9f3-2cbc-439...|   a| 11| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   
   # look into the timeline to get the latest commit
   # then read incremental
   # $ ll /tmp/test_hudi_pyspark_local/.hoodie/
   #  .schema
   #  archived
   #  .aux
   #  20230317202715405.deltacommit.requested
   #  .20230317202715405.deltacommit.requested.crc
   #  metadata
   #  hoodie.properties
   #  .hoodie.properties.crc
   #  20230317202715405.deltacommit.inflight
   #  .20230317202715405.deltacommit.inflight.crc
   #  20230317202715405.deltacommit
   #  .20230317202715405.deltacommit.crc
   #  20230317202717144.deltacommit.requested
   #  .20230317202717144.deltacommit.requested.crc
   #  20230317202717144.deltacommit.inflight
   #  .20230317202717144.deltacommit.inflight.crc
   #  .20230317202717144.deltacommit.crc
   #  20230317202717144.deltacommit
   #  20230317202718414.deltacommit.requested
   #  .20230317202718414.deltacommit.requested.crc
   #  .20230317202718414.deltacommit.inflight.crc
   #  20230317202718414.deltacommit.inflight
   #  .20230317202718414.deltacommit.crc
   #  20230317202718414.deltacommit
   #  .temp
   last_commit=20230317202718414
   hudi_incremental = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.query.type": "incremental",
       "hoodie.datasource.read.begin.instanttime": str(last_commit -1),
       "hoodie.datasource.read.end.instanttime": str(last_commit),
   }
   spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202718414|20230317202718414...|                 a|                   foo|20c4c9f3-2cbc-439...|   a|  9| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   ```
   
   ```
   sc.setLogLevel("WARN")
   
   tableName = "test_hudi_pyspark_local"
   basePath = f"/tmp/{tableName}"
   
   df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
   mode="COPY_ON_WRITE"
   #mode="MERGE_ON_READ"
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203659919|20230317203659919...|                 a|                   foo|e90f346f-d878-439...|   a| 10| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
   (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203701577|20230317203701577...|                 a|                   foo|e90f346f-d878-439...|   a| 11| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
   (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203701577|20230317203701577...|                 a|                   foo|e90f346f-d878-439...|   a| 11| foo|
   +-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   # look into the timeline to get the latest commit
   # then read incremental
   # $ ll /tmp/test_hudi_pyspark_local/.hoodie/
   # .schema
   # .aux
   # archived
   # 20230317203659919.commit.requested
   # .20230317203659919.commit.requested.crc
   # metadata
   # hoodie.properties
   # .hoodie.properties.crc
   # .20230317203659919.inflight.crc
   # 20230317203659919.inflight
   # .20230317203659919.commit.crc
   # 20230317203659919.commit
   # 20230317203701577.commit.requested
   # .20230317203701577.commit.requested.crc
   # .20230317203701577.inflight.crc
   # 20230317203701577.inflight
   # .20230317203701577.commit.crc
   # 20230317203701577.commit
   # 20230317203702776.commit.requested
   # .20230317203702776.commit.requested.crc
   # .20230317203702776.inflight.crc
   # 20230317203702776.inflight
   # .20230317203702776.commit.crc
   # 20230317203702776.commit
   last_commit=20230317203702776
   hudi_incremental = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.query.type": "incremental",
       "hoodie.datasource.read.begin.instanttime": str(last_commit -1),
       "hoodie.datasource.read.end.instanttime": str(last_commit),
   }
   spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid| ts|part|
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   +-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1494210253

   JIRA created for the same - https://issues.apache.org/jira/browse/HUDI-6025


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1488595629

   @codope @parisni 
   Yes above fix (https://github.com/apache/hudi/pull/8299) is fixing this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] parisni commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "parisni (via GitHub)" <gi...@apache.org>.
parisni commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1496493568

   > The solution would be to perform a base + log merge first (which will consider the precombine fields), then filter for the commit range (increases the cost of the query, but will give you same semantics).
   
   Indeed that's what i would expect. Also, i would expect the incremental MOR apply the same merger that is used for reading (not only the precombine field)
   
   > How much of a blocker is this for your project? This will help us prioritize this.
   
   I'm fine with CoW implementation right now. What would have been helpful is a clear statement in the documentation stating mor and cow handle incremental but in a different way. 
   
   Thanks a lot.
   
   
   On April 4, 2023 3:17:00 PM UTC, vinoth chandar ***@***.***> wrote:
   >@parisni To clarify the semantics a bit. Incremental query provides all the records that changed between a start and end commit time range. If there are multiple writes (CoW) or multiple compactions (MoR) between queries, you would only see the latest record (per pre combine logic) up to the compacted point, then log records after that. This is similar to the Kafka compacted topic [design](https://kafka.apache.org/documentation/#compaction), to bound the "catch up" time for downstream jobs. If one wants every change record i.e, multiple rows in incremental query output per key for each change, that's what the CDC feature solves, right now it's supported for CoW).
   >
   >As for this problem, the issue is the reads are served out of the logs based on the commit time range and it's fine as long as we are just returning the latest committed records. In this case, there is a pre-combine field to respect and that's not handled yet. The solution would be to perform a base + log merge first (which will consider the precombine fields), then filter for the commit range (increases the cost of the query, but will give you same semantics). 
   >
   >How much of a blocker is this for your project? This will help us prioritize this. 
   >
   >
   > 
   >
   >-- 
   >Reply to this email directly or view it on GitHub:
   >https://github.com/apache/hudi/issues/8222#issuecomment-1496165517
   >You are receiving this because you were mentioned.
   >
   >Message ID: ***@***.***>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "vinothchandar (via GitHub)" <gi...@apache.org>.
vinothchandar commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1496165517

   @parisni To clarify the semantics a bit. Incremental query provides all the records that changed between a start and end commit time range. If there are multiple writes (CoW) or multiple compactions (MoR) between queries, you would only see the latest record (per pre combine logic) up to the compacted point, then log records after that. This is similar to the Kafka compacted topic [design](https://kafka.apache.org/documentation/#compaction), to bound the "catch up" time for downstream jobs. If one wants every change record i.e, multiple rows in incremental query output per key for each change, that's what the CDC feature solves, right now it's supported for CoW).
   
   As for this problem, the issue is the reads are served out of the logs based on the commit time range and it's fine as long as we are just returning the latest committed records. In this case, there is a pre-combine field to respect and that's not handled yet. The solution would be to perform a base + log merge first (which will consider the precombine fields), then filter for the commit range (increases the cost of the query, but will give you same semantics). 
   
   How much of a blocker is this for your project? This will help us prioritize this. 
   
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "vinothchandar (via GitHub)" <gi...@apache.org>.
vinothchandar commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1496166005

   cc @lokeshj1703 who's owning the JIRA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1486194924

   @parisni Able to reproduce the issue. 
   
   MOR tables and COW tables do have different implementation.  COW table is using the _hoodie_commit_time in the table to filter out incremental.
   
   When I tried to query using the commit time, I was able to fetch the data.
   
   <img width="871" alt="image" src="https://user-images.githubusercontent.com/63430370/228127376-d6c9c374-0cfe-4a48-8f40-b8fec354268f.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1488033021

   Maybe https://github.com/apache/hudi/pull/8299 fixes this issue @parisni 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] parisni commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "parisni (via GitHub)" <gi...@apache.org>.
parisni commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1489063097

   @codope @ad1happy2go I am affraid they are not related. I would expect MOR to not return any data in the OP example (same output as COW - which is correct)
   
   BTW, It turns out MOR incremental read does not trigger merge. It only returns the log file corresponding to the instant range. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8222: [SUPPORT] Incremental read with MOR does not work as COW

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8222:
URL: https://github.com/apache/hudi/issues/8222#issuecomment-1493623149

   @parisni Sorry, completely misunderstood the issue last time. I didn't noticed the values of ts(precombine field)
   
   Looks like a valid data consistency issue and still exists in Master. Will raise the JIRA for the same.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org