You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/25 03:05:41 UTC

[GitHub] [hudi] rubenssoto opened a new issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

rubenssoto opened a new issue #1878:
URL: https://github.com/apache/hudi/issues/1878


   Hi, how are you?
   
   Im using EMR 5.30.1, spark 2.4.5, hudi 0.5.2 and my data is store in S3.
   
   Since today Im trying to migrate some of our datasets in production to apache hudi, Im having problems with the first, could you help me please?
   
   It is a small dataset, 26gb distributed by 89 parquet files. Im reading the data with structured streaming, reading 4 files per trigger, when I write the stream in a regular parquet, works, but if I use hudi doenst work.
   
   This is my hudi options, I tryed with or without shuffle options, I need files more than 500mb with max 1000mb
   
   hudi_options = {
     'hoodie.table.name': tableName,
     'hoodie.datasource.write.recordkey.field': 'id',
     'hoodie.datasource.write.partitionpath.field': 'event_date',
     'hoodie.datasource.write.table.name': tableName,
     'hoodie.datasource.write.operation': 'insert',
     'hoodie.datasource.write.precombine.field': 'LineCreatedTimestamp',
     'hoodie.datasource.write.hive_style_partitioning': 'true',
     'hoodie.parquet.small.file.limit': 500000000,
     'hoodie.parquet.max.file.size': 800000000,
     'hoodie.datasource.hive_sync.enable': 'true',
     'hoodie.datasource.hive_sync.table': tableName,
     'hoodie.datasource.hive_sync.database': 'datalake_raw',
     'hoodie.datasource.hive_sync.partition_fields': 'event_date',
     'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://<EMR HOST>:10000',
     'hoodie.insert.shuffle.parallelism': 20,
     'hoodie.upsert.shuffle.parallelism': 20
   } 
   
   My read and write functions:
   
   def read_parquet_stream(spark_session, read_folder_path, data_schema, max_files_per_trigger):
       spark = spark_session
       df = spark \
           .readStream \
           .option("maxFilesPerTrigger", max_files_per_trigger) \
           .schema(data_schema) \
           .parquet(read_folder_path)
       return df
   
   def write_hudi_dataset_stream(spark_data_frame, checkpoint_location_folder, write_folder_path, hudi_options):
       df_write_query = spark_data_frame \
                         .writeStream \
                         .options(**hudi_options) \
                         .trigger(processingTime='20 seconds') \
                         .outputMode('append') \
                         .format('hudi')\
                         .option("checkpointLocation", checkpoint_location_folder) \
                         .start(write_folder_path)
       df_write_query.awaitTermination()
   
   I caught some errors:
   
   Job aborted due to stage failure: Task 11 in stage 2.0 failed 4 times, most recent failure: Lost task 11.3 in stage 2.0 (TID 53, ip-10-0-87-171.us-west-2.compute.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.3 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
   My cluster is small, but the data is small too
   master: 4 cores and 16gb ram
   nodes: 2 nodes with 4 cores and 16gb each
   
   If I write stream in a regular parquet takes 38min to finish the job, but in hudi it have been passed more then one hour and half and job haven't finished yet.
   
   Could you help me? I need to put this job in production as soon as possible.
   
   Thank you Guys!!! 
   
   
   <img width="1680" alt="Captura de Tela 2020-07-24 às 17 26 45" src="https://user-images.githubusercontent.com/36298331/88447482-27be5000-ce0a-11ea-889a-c5f1042fbe98.png">
   
   <img width="1680" alt="Captura de Tela 2020-07-25 às 00 03 26" src="https://user-images.githubusercontent.com/36298331/88447495-4cb2c300-ce0a-11ea-9482-7965d7646476.png">
   <img width="1680" alt="Captura de Tela 2020-07-25 às 00 05 00" src="https://user-images.githubusercontent.com/36298331/88447510-81267f00-ce0a-11ea-9311-38a395390d6b.png">
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-663906344


   Hi bvaradar, thank you for your awnser.
   
   I tried to increase spark.yarn.executor.memoryOverhead to 2GB with foreachbatch option inside writestream and it worked. 4 nodes with 4 cores and 32gb each, took 52 minutes is a good time for this hardware configuration? I think that could be better but Im very happy.
   Whats the difference with spark streaming with or without foreachbatch, Am I lost anything important? I tried, because I saw in delta lake docs, they use foreachbatch for merge in spark streaming.
   
   <img width="1680" alt="Captura de Tela 2020-07-25 às 18 04 27" src="https://user-images.githubusercontent.com/36298331/88466311-6b17cd80-cea1-11ea-9dbd-97753a2e6978.png">
   <img width="1680" alt="Captura de Tela 2020-07-25 às 18 04 53" src="https://user-images.githubusercontent.com/36298331/88466313-6eab5480-cea1-11ea-8cb9-0e9a5c30b6c4.png">
   <img width="1680" alt="Captura de Tela 2020-07-25 às 18 04 40" src="https://user-images.githubusercontent.com/36298331/88466316-70751800-cea1-11ea-8ec6-23bd69e51b17.png">
   
   
   Some jobs took more time, do you know why some jobs created a lot of tasks? I think that could be more efficient if they write with fewer tasks. 
   Now I will try do the same thing with write operation "upsert" because my data set could have some duplicated values and I don't know what files are they.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-667675696


   Bulk-insert do some deduplication?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-669431943


   Thank you so much @bvaradar for your help


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-663813751


   This is a spark tuning issue in general. The slowness is due to memory pressure and node failures due to it. Atleast in one of the batches, I see task failures (and retries) during reading from source parquet file itself. 
   
   As mentioned in the suggestion  "Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.", you need to increase spark.yarn.executor.memoryOverhead. You are running 2 executors per machine with 8GB room for each which may not have lot of room. If you are trying to compare parquet write with hudi, note that hudi adds metadata fields which gives incremental pull, indexing and other benefits. If your original record size is very small and comparable to metadata overhead and your setup is already close to hitting the limit for parquet write, then you would need to give more resources. 
   
   On a related note, since you are trying to use streaming for bootstrapping from a fixed source, have you considered using bulk insert or insert (for size handling) in batch mode which would sort and write the data once. With this mode of incremental inserting, Hudi would try to increase a small file generated in the previous batch. This means that it has to read the small file and apply new insert and write a newer version of the file (which is bigger). As you can see, more the number of iterations here, the more repeated reads will happen. Hence, you would benefit by throwing more resources for a potentially shorter time to do this migration. 
   
    
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-663927931


   Hi Again. 👍 
   
   When I changed the insert option to upsert the performance got worse.
   1 Master Node m5.xlarge(4 vcpu, 16gb Ram)
   1 Core Node r5.xlarge(4 vcpu, 32gb ram)
   4 Task Nodes r5.xlarge(4 vcpu, 32 ram)
   spark.yarn.executor.memoryOverhead: 2048
   Im reading 10 files on each trigger, at the beginning my file size is 1gb each
   
   **hudi options**
   hudi_options = {
     'hoodie.table.name': tableName,
     'hoodie.datasource.write.recordkey.field': 'id',
     'hoodie.datasource.write.partitionpath.field': 'event_date',
     'hoodie.datasource.write.table.name': tableName,
     'hoodie.datasource.write.operation': 'upsert',
     'hoodie.datasource.write.precombine.field': 'LineCreatedTimestamp',
     'hoodie.datasource.write.hive_style_partitioning': 'true',
     'hoodie.parquet.small.file.limit': 500000000,
     'hoodie.parquet.max.file.size': 900000000,
     'hoodie.datasource.hive_sync.enable': 'true',
     'hoodie.datasource.hive_sync.table': tableName,
     'hoodie.datasource.hive_sync.database': 'datalake_raw',
     'hoodie.datasource.hive_sync.partition_fields': 'event_date',
     'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://ip-10-0-53-190.us-west-2.compute.internal:10000'
   }
   
   I totally understand what you said about hudi metadata and ordering operations, but I'm trying to process only 25gb of data and only on tasks nodes I have more than 100gb of ram, I am probably  doing something wrong hehehe
   All process took 1 hour and 40 minutes.
   
   <img width="1680" alt="Captura de Tela 2020-07-25 às 22 03 23" src="https://user-images.githubusercontent.com/36298331/88469957-3c175100-cecd-11ea-9857-a82e9f249fa1.png">
   <img width="1680" alt="Captura de Tela 2020-07-25 às 22 04 03" src="https://user-images.githubusercontent.com/36298331/88469959-40436e80-cecd-11ea-98f6-225b9b30f01d.png">
   <img width="1680" alt="Captura de Tela 2020-07-25 às 22 03 34" src="https://user-images.githubusercontent.com/36298331/88469961-42a5c880-cecd-11ea-8c28-996afe0e1547.png">
   
   
   I tried the same operation in batch mode with insert operation it took 46 minutes, the overall performance it seems much better in batch mode like you could see in the follow image
   <img width="1680" alt="Captura de Tela 2020-07-25 às 23 22 30" src="https://user-images.githubusercontent.com/36298331/88470018-d37ca400-cecd-11ea-817c-ac28f62a2276.png">
   
   but this batch execution created a lot of 50Mb files, is there way to get better?
   
   ------------
   
   I think to process big workloads in batch mode with insert operation could be much more scalable, what do you think? My situation is, I have some datasets that I need to process all data, my data has to be deduplicated because is CDC data and after that I need to keep updating the data with streaming. These new datasets will be a source to create many others tables in the company.
   
   Could you advise me wich could be the better solution? I think, that I could batch all data and after that keep running a streaming solution to keep the data updated.
   
   Last question, when I run in insert mode on streaming job with foreachbatch, hudi will deduplicate only data that exist inside this specific batch? For example, I'm reading 10 files on each trigger, so, if in the next batch trigger has data that exists in the previous batch trigger, data wont be deduplicate, I'm right?
   
   Thank you so much, and I'm sorry for a lot of query, but I need to use Hudi on production ASAP
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-667023700


   For a monotonically increasing id, you can use bulk-insert instead of insert for first time loading of files, this would nicely order records by the id and your range-pruning during index lookup would be efficient. The parallelism configuration https://hudi.apache.org/docs/configurations.html#withBulkInsertParallelism controls the number of file getting generated. 
   
   `I will use aws Athena to query all my tables and this specific order table may be delayed up to 15 minutes. I saw that Athena only query Read Optmized MoR, how MoR could help me in this case?`
     ===>. Would let @umehrot2  answer this question. But If your use-case allows, you can schedule compaction for the MOR table at a frequency to align with a SLA that you want to maintain. This way you can still query those data using RO.
   
   For the insert operations, the same config (as in upsert) controls file sizing ('hoodie.parquet.max.file.size')
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-665432999


   Hi bvaradar, how are you? I hope doing fine!
   
   I have a new case, which is a little more important to me, the problem is almost the same. I adopted the strategy to first batch all data in an insert operation and after that, get the latest data with structured streaming. 
   
   Answer your question, all my tables have PK with integers id and normally they are auto-increment. Does Hudi already order data in an insert operation by pk? Because in my first batch I am sorting the data by date, is it necessary?
   
   I think I have the CoW problem that you said. I have an order table with my clients orders, every minute new orders arrive, and my clients could give a grade to the order at any point in time, for example in a streaming batch could have a client order grade for an order that was made in the last month.
   
   This table, today, is very small, in hudi dataset, are 15 files of 500mb each, I didn't partition the table because a daily partition is small and partition by month I think don't make sense. 
   My streaming is running right now, but Hudi rewrites all 15 files every streaming batch, my data is small, so its fine, but I think it is not efficient and when data the grows it could become a problem.
   
   I will use aws Athena to query all my tables and this specific order table may be delayed up to 15 minutes. I saw that Athena only query Read Optmized MoR, how MoR could help me in this case?
   
   The last question, in an insert operation, how can I control the file size?
   
   Thank you for your time!
   
   Some images of my streaming:
   ![Uploading Captura de Tela 2020-07-29 às 02.04.06.png…](
   <img width="1680" alt="Captura de Tela 2020-07-29 às 02 03 54" src="https://user-images.githubusercontent.com/36298331/88758874-ea2a3180-d13f-11ea-914c-268135f002f9.png">
   <img width="1680" alt="Captura de Tela 2020-07-29 às 02 03 33" src="https://user-images.githubusercontent.com/36298331/88758879-ebf3f500-d13f-11ea-9f13-0e731940b605.png">
   <img width="1680" alt="Captura de Tela 2020-07-29 às 02 01 51" src="https://user-images.githubusercontent.com/36298331/88758885-ee564f00-d13f-11ea-802b-c896de02ded7.png">
   
   
   )
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto closed issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto closed issue #1878:
URL: https://github.com/apache/hudi/issues/1878


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto edited a comment on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto edited a comment on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-665432999






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-668076474


   Set hoodie.combine.before.insert=true for deduping during bulk insert 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] rubenssoto commented on issue #1878: [SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer

Posted by GitBox <gi...@apache.org>.
rubenssoto commented on issue #1878:
URL: https://github.com/apache/hudi/issues/1878#issuecomment-663806475


   I tried resizing the cluster with 3 more nodes, so in total(4 nodes) after resizing I had 4 cores in each node and 16gb of ram each, and it wasn't any difference, the job keeps very slow and with memory errors.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org