You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/20 19:03:47 UTC

[GitHub] [hudi] jpugliesi opened a new issue #2002: [SUPPORT] Inconsistent Commits between CLI and Incremental Query

jpugliesi opened a new issue #2002:
URL: https://github.com/apache/hudi/issues/2002


   I am seeing inconsistent commit history when querying a Hudi table incrementally (with `begin_time = 0` ) vs. using the CLI's `commits show`.
   
   [Following up on the Slack thread here with @bhasudha ](https://apache-hudi.slack.com/archives/C4D716NPQ/p1597336199348200) 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ```
   # load data
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
   df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.show(vertical=True, truncate=False, n=3)
   
   # we'll save the non-hive hudi table here
   hudi_base_path = f"s3://{bucket}/tmp/hudi/trips_cow"
   
   # https://hudi.apache.org/docs/configurations.html
   hudi_insert_options = {
     'hoodie.table.name': "trips_cow",
     'hoodie.datasource.write.recordkey.field': 'uuid',
     'hoodie.datasource.write.partitionpath.field': '', # note we now define the partition field
     'hoodie.datasource.write.precombine.field': 'ts', # this defines how to compare rows with the same record key - "greatest" value wins (using .compareTo)
     'hoodie.datasource.write.operation': 'bulk_insert', # note we're bulk inserting the initial dataset
     'hoodie.consistency.check.enabled': 'true', # enable consistency checking for S3
     'hoodie.upsert.shuffle.parallelism': 2, 
     'hoodie.insert.shuffle.parallelism': 2
   }
   
   print("Initial insert")
   df.write.format("hudi"). \
     options(**hudi_insert_options). \
     mode("overwrite"). \
     save(hudi_base_path)
   
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime")
   
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after initial insert:", commits)
   
   ###################################################
   # Upsert new data - this should create a new commit
   ###################################################
   updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   # https://hudi.apache.org/docs/configurations.html
   hudi_upsert_options = {
     # hive sync config
     'hoodie.table.name': "trips_cow",
     # normal hudi write config
     'hoodie.datasource.write.recordkey.field': 'uuid',
     'hoodie.datasource.write.partitionpath.field': 'partitionpath',
     'hoodie.datasource.write.precombine.field': 'ts',
     'hoodie.datasource.write.operation': 'upsert', # (specifies upsert)
     'hoodie.consistency.check.enabled': 'true',
     'hoodie.upsert.shuffle.parallelism': 2, 
     'hoodie.insert.shuffle.parallelism': 2
   }
   
   print("First Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after second upsert:", commits)
   
   #### Upsert 2
   # Write updates again - we expect there to be 3 commits after this
   updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   print("Second Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after second upsert:", commits)
   
   # Write updates again - we expect there to be 3 commits after this
   updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   print("Third Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after third upsert:", commits)
   ```
   
   Results:
   ```
   Initial insert
   Commits after initial insert: ['20200820185825']
   First Upsert
   Commits after first upsert: ['20200820185825', '20200820185832']
   Second Upsert
   Commits after second upsert: ['20200820185825', '20200820185832', '20200820185841']
   Third Upsert
   Commits after third upsert: ['20200820185825', '20200820185841', '20200820185848']
   ```
   
   **Expected behavior**
   Notice that there are only 3 commits after the 3rd UPSERT - I expect 4 commits to be returned in the incremental view.
   
   Additionally, the Hudi CLI shows 4 commits, as expected:
   ![image](https://user-images.githubusercontent.com/2141170/90813765-e328b600-e2dc-11ea-869a-570cd27cf853.png)
   
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jpugliesi edited a comment on issue #2002: [SUPPORT] Inconsistent Commits between CLI and Incremental Query

Posted by GitBox <gi...@apache.org>.
jpugliesi edited a comment on issue #2002:
URL: https://github.com/apache/hudi/issues/2002#issuecomment-679298548


   @bvaradar I suspected this may have been the case, but I was not able to find any documentation anywhere that states that a commit tracks the timestamp of when a _specific subset of records is changed_, as opposed to the timestamp of when a write operation was executed. Does such documentation exist, and if so, can you please point me to it?
   
   Since incremental query does not necessarily contain the full set of table commits, is there an alternative way to get the full tabe commit history via the Spark or some other API _besides the CLI_? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2002: [SUPPORT] Inconsistent Commits between CLI and Incremental Query

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2002:
URL: https://github.com/apache/hudi/issues/2002#issuecomment-679275414


   @jpugliesi : It looks like 2nd and 3rd upsert updated the same set of records (generateUpdates()). In this case, all those records will be updated with latest commit time and incremental query will only show the commit time of 3rd upsert. Hope, this is clear. Please reopen if this does not make sense to you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar closed issue #2002: [SUPPORT] Inconsistent Commits between CLI and Incremental Query

Posted by GitBox <gi...@apache.org>.
bvaradar closed issue #2002:
URL: https://github.com/apache/hudi/issues/2002


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jpugliesi commented on issue #2002: [SUPPORT] Inconsistent Commits between CLI and Incremental Query

Posted by GitBox <gi...@apache.org>.
jpugliesi commented on issue #2002:
URL: https://github.com/apache/hudi/issues/2002#issuecomment-679298548


   @bvaradar I suspected this may have been the case, but I was not able to find any documentation anywhere that states that a commit tracks the timestamp of when a _specific subset of records is changed_, as opposed to the timestamp of when a write operation was executed. Does such documentation exist, and if so, can you please point me to it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org