You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "stayrascal (via GitHub)" <gi...@apache.org> on 2023/04/25 07:40:57 UTC

[GitHub] [hudi] stayrascal opened a new issue, #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

stayrascal opened a new issue, #8571:
URL: https://github.com/apache/hudi/issues/8571

   **_Tips before filing an issue_**
   
   **Describe the problem you faced**
   
   I'm using spark 3.2.1 and hudi 0.11.1, and I have a MOR table wrote by a Flink 1.15 stream job continuously.
   
   I'm trying to query data from RT table via spark-sql, I found the query result is not refresh if we execute a query SQL multiple times, it seems that the current spark session bind to a fix snapshot/commit. If I open a spark-sql and run same query SQL, I can found the latest result, but will meet same problem later. Since I only keep the latest 5 delta commit, a few moments later, I will meet FileNotFoundException because the old parquet file has been removed.
   
   the table metadata
   ```
   CREATE TABLE flink_hudi_mor_streaming_tbl_1(
     uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
     name VARCHAR(10),
     age INT,
     ts TIMESTAMP(3)
   )
   WITH (
     'connector' = 'hudi',
     'table.type' = 'MERGE_ON_READ',
     'read.tasks' = '1',
     'write.tasks' = '4',
     'compaction.trigger.strategy' = 'num_or_time',
     'compaction.delta_commits' = '5',
     'compaction.delta_seconds' = '600',
     'compaction.tasks' = '4',
     'write.parquet.max.file.size' = '128',
     'write.batch.size' = '128',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'jdbc',
     'hive_sync.skip_ro_suffix' = 'true',
     'hive_sync.metastore.uris' = 'thrift://xxxxxx:9083',
     'hive_sync.jdbc_url' = 'jdbc:hive2:/xxxxxxx:10000',
     'hive_sync.table' = 'flink_hudi_mor_streaming_tbl_1',
     'hive_sync.db' = 'xxxx',
     'hive_sync.username' = 'xxxx',
     'hive_sync.password' = 'xxxxxx',
     'path' = 'xxxxxxxxxxxxxxx',
     'hoodie.datasource.write.recordkey.field' = 'uuid',
     'precombine.field' = 'ts'
   );
   ```
   
   
   ```
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.905 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.422 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.381 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.38 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.365 seconds, Fetched 5 row(s)
   spark-sql> call show_commits(table => 'flink_hudi_mor_streaming_tbl_1_rt');
   20230425150737787	8147	0	1	1	60	60	0
   20230425150707878	8166	0	1	1	60	60	0
   20230425150707502	440544	1	0	1	281	0	0
   20230425150637592	8164	0	1	1	60	60	0
   20230425150607557	8166	0	1	1	60	60	0
   20230425150537662	8163	0	1	1	60	60	0
   20230425150508373	8086	0	1	1	60	60	0
   20230425150447858	5885	0	1	1	42	0	0
   Time taken: 0.436 seconds, Fetched 8 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.464 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt timestamp as of '20230425150737787' group by age order by age limit 5;
   20	14
   21	15
   22	17
   23	16
   24	18
   Time taken: 1.137 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.346 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt timestamp as of '20230425150447858' group by age order by age limit 5;
   20	5
   21	1
   22	2
   23	3
   24	4
   Time taken: 0.713 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt timestamp as of '20230425150508373' group by age order by age limit 5;
   20	7
   21	2
   22	4
   23	4
   24	10
   Time taken: 0.742 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt timestamp as of '20230425150537662' group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.706 seconds, Fetched 5 row(s)
   spark-sql> select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   20	8
   21	4
   22	7
   23	8
   24	12
   Time taken: 0.41 seconds, Fetched 5 row(s)
   ```
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. create & insert data via flink sql
   ```
   CREATE TABLE flink_hudi_mor_streaming_tbl_1(
     uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
     name VARCHAR(10),
     age INT,
     ts TIMESTAMP(3)
   )
   WITH (
     'connector' = 'hudi',
     'table.type' = 'MERGE_ON_READ',
     'read.tasks' = '1',
     'write.tasks' = '4',
     'compaction.trigger.strategy' = 'num_or_time',
     'compaction.delta_commits' = '5',
     'compaction.delta_seconds' = '600',
     'compaction.tasks' = '4',
     'write.parquet.max.file.size' = '128',
     'write.batch.size' = '128',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'jdbc',
     'hive_sync.skip_ro_suffix' = 'true',
     'hive_sync.metastore.uris' = 'thrift://xxxxxx:9083',
     'hive_sync.jdbc_url' = 'jdbc:hive2:/xxxxxxx:10000',
     'hive_sync.table' = 'flink_hudi_mor_streaming_tbl_1',
     'hive_sync.db' = 'xxxx',
     'hive_sync.username' = 'xxxx',
     'hive_sync.password' = 'xxxxxx',
     'path' = 'xxxxxxxxxxxxxxx',
     'hoodie.datasource.write.recordkey.field' = 'uuid',
     'precombine.field' = 'ts'
   );
   
   CREATE TABLE `fake_datasource` ( 
       `uuid` STRING, 
       `name` STRING,
       `age` INT,
       `ts` AS PROCTIME()
   ) WITH (
     'connector' = 'faker', 
     'rows-per-second' = '2',
     'fields.uuid.expression' = '#{numerify ''id####''}',
     'fields.name.expression' = '#{superhero.name}',
     'fields.age.expression' = '#{number.numberBetween ''20'',''50''}',
     'fields.ts.expression' =  '#{date.past ''45'',''10'',''SECONDS''}'
   );
   
   insert into flink_hudi_mor_streaming_tbl_1 select * from fake_datasource;
   set execution.checkpointing.interval=30sec;
   ```
   2. start a spark-sql to query table multiple times
   ```
   spark-sql \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
   ```
   ```
   select age, count(*) from flink_hudi_mor_streaming_tbl_1_rt group by age order by age limit 5;
   ```
   
   
   **Expected behavior**
   
   I'm not sure if there any configured key is wrong or missing something, but i think it should query the latest RT view during execute each query SQL.
   
   **Environment Description**
   
   * Hudi version : 0.11.1
   
   * Spark version : 3.2.1
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] stayrascal commented on issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

Posted by "stayrascal (via GitHub)" <gi...@apache.org>.
stayrascal commented on issue #8571:
URL: https://github.com/apache/hudi/issues/8571#issuecomment-1521704245

   After make a deep analysis, it seems that the root cause problem is that the spark cache the plan(refer to https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L245-L257).
   
   Once a hudi table has been scanned, the logic plan include file index info will be cached, and it will return the cached logic plan directly during scan the table next time, but the meta data of hudi table/file index might be expired, e.g.  commit a new snapshot, or the commit/snapshot has been clean up.
   
   Not sure if there any workaround can save this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8571:
URL: https://github.com/apache/hudi/issues/8571#issuecomment-1521905748

   @stayrascal Yes, you are correct. you can use this conf while starting spark shell - spark.sql.filesourceTableRelationCacheSize=0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on issue #8571:
URL: https://github.com/apache/hudi/issues/8571#issuecomment-1523102114

   @stayrascal Can you try running this before executing your Spark-SQL query?
   
   ```
   REFRESH TABLE flink_hudi_mor_streaming_tbl_1_rt;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8571:
URL: https://github.com/apache/hudi/issues/8571#issuecomment-1527423616

   @stayrascal Closing this bug as this should not be related to hudi but more related to spark sql limitation.
   
   You can follow above workarounds to handle it. Please reopen in case of any concerns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope closed issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope closed issue #8571: [ISSUE] spark-sql doesn't read the latest snapshot of MOR table
URL: https://github.com/apache/hudi/issues/8571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org