You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Максим Радионов <ra...@gmail.com> on 2019/05/31 09:53:32 UTC

slow work of BoundedInMemoryQueue

Hi,

I think part of the update process is work slow and I'm doing something
wrong. Example work MOR at -
https://drive.google.com/open?id=17YP_V5k-g3Rp6-jaWaTWvKUSBTOPHg4g
as it seems to me a long time trying to update the existing records, and I
don't understand why it takes most of the time to work  "
BoundedInMemoryQueue" (more than 1.5 hours to overwrite 15 GB and 625 files)
I use Spark - version 2.3.0.cloudera3 and try apply 100mln records(15GB)
for snapshot 1 billion records  (1ТБ ~8k files)
I am so appreciate if anyone can help me locate this problem.

Logs, executor

19/05/29 12:47:09 INFO storage.ShuffleBlockFetcherIterator: Started
269 remote fetches in 109 ms

19/05/29 12:47:09 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=mradionov (auth:SIMPLE)]]]

19/05/29 12:47:09 INFO io.HoodieMergeHandle: MaxMemoryPerPartitionMerge => 0

19/05/29 12:47:09 INFO collection.DiskBasedMap: Spilling to file
location /tmp/91e5578f-6e25-476d-8c63-15834c7588f9 in host (1.1.1.1)
with hostname (host)

19/05/29 12:47:23 INFO io.HoodieMergeHandle: Number of entries in
MemoryBasedMap => 0Total size in bytes of MemoryBasedMap => 0Number of
entries in DiskBasedMap => 125849Size of file spilled to disk =>
75107012

19/05/29 12:47:25 INFO io.HoodieMergeHandle: Merging new data into
oldPath /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_2593_20190527161512.parquet,
as newPath /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_461_20190529115632.parquet

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO compress.CodecPool: Got brand-new compressor [.gz]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO queue.BoundedInMemoryExecutor: starting consumer thread

19/05/29 *12:47:25* INFO queue.IteratorBasedQueueProducer: starting to
buffer records

19/05/29 *13:28:44* INFO queue.IteratorBasedQueueProducer: finished
buffering records

19/05/29 13:28:44 INFO queue.BoundedInMemoryExecutor: Queue
Consumption is done; notifying producer threads



Best Regards
Maksim Radionov

Re: slow work of BoundedInMemoryQueue

Posted by Максим Радионов <ra...@gmail.com>.
Hi Balaji,

Thank you, tuning these parameters helped me.

Best Regards
Maksim

пт, 31 мая 2019 г. в 19:16, vbalaji@apache.org <vb...@apache.org>:

>
> Hi Maksim,
> It looks like the spark and hudi memory settings ( spark.executor.memory,
> spark.memory.fraction, hudi.memory.merge.fraction)  may not have been
> configured correctly to let Hudi use memory for merging. With the current
> settings you have, Hudi has no memory to use for Merge process and is
> resorting to disk based merging which will be slow but would progress
> without OOM. You would need to check your configs.
> The logic for calculating memory is in :
>
> https://github.com/apache/incubator-hudi/blob/acd74129cd97f24c0dde9bf032a4048f2ce27b5f/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java#L117
>
> Balaji.V
>
>
>     On Friday, May 31, 2019, 2:53:56 AM PDT, Максим Радионов <
> radionov.maxim@gmail.com> wrote:
>
>  Hi,
>
> I think part of the update process is work slow and I'm doing something
> wrong. Example work MOR at -
> https://drive.google.com/open?id=17YP_V5k-g3Rp6-jaWaTWvKUSBTOPHg4g
> as it seems to me a long time trying to update the existing records, and I
> don't understand why it takes most of the time to work  "
> BoundedInMemoryQueue" (more than 1.5 hours to overwrite 15 GB and 625
> files)
> I use Spark - version 2.3.0.cloudera3 and try apply 100mln records(15GB)
> for snapshot 1 billion records  (1ТБ ~8k files)
> I am so appreciate if anyone can help me locate this problem.
>
> Logs, executor
>
> 19/05/29 12:47:09 INFO storage.ShuffleBlockFetcherIterator: Started
> 269 remote fetches in 109 ms
>
> 19/05/29 12:47:09 INFO util.FSUtils: Hadoop Configuration:
> fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
> FileSystem:
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
> ugi=mradionov (auth:SIMPLE)]]]
>
> 19/05/29 12:47:09 INFO io.HoodieMergeHandle: MaxMemoryPerPartitionMerge =>
> 0
>
> 19/05/29 12:47:09 INFO collection.DiskBasedMap: Spilling to file
> location /tmp/91e5578f-6e25-476d-8c63-15834c7588f9 in host (1.1.1.1)
> with hostname (host)
>
> 19/05/29 12:47:23 INFO io.HoodieMergeHandle: Number of entries in
> MemoryBasedMap => 0Total size in bytes of MemoryBasedMap => 0Number of
> entries in DiskBasedMap => 125849Size of file spilled to disk =>
> 75107012
>
> 19/05/29 12:47:25 INFO io.HoodieMergeHandle: Merging new data into
> oldPath
> /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_2593_20190527161512.parquet,
> as newPath
> /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_461_20190529115632.parquet
>
> 19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
> fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
> FileSystem:
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
> ugi=(auth:SIMPLE)]]]
>
> 19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
> fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
> FileSystem:
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
> ugi=(auth:SIMPLE)]]]
>
> 19/05/29 12:47:25 INFO compress.CodecPool: Got brand-new compressor [.gz]
>
> 19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
> fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
> FileSystem:
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
> ugi=(auth:SIMPLE)]]]
>
> 19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
> fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
> FileSystem:
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
> ugi=(auth:SIMPLE)]]]
>
> 19/05/29 12:47:25 INFO queue.BoundedInMemoryExecutor: starting consumer
> thread
>
> 19/05/29 *12:47:25* INFO queue.IteratorBasedQueueProducer: starting to
> buffer records
>
> 19/05/29 *13:28:44* INFO queue.IteratorBasedQueueProducer: finished
> buffering records
>
> 19/05/29 13:28:44 INFO queue.BoundedInMemoryExecutor: Queue
> Consumption is done; notifying producer threads
>
>
>
> Best Regards
> Maksim Radionov

Re: slow work of BoundedInMemoryQueue

Posted by "vbalaji@apache.org" <vb...@apache.org>.
 
Hi Maksim,
It looks like the spark and hudi memory settings ( spark.executor.memory, spark.memory.fraction, hudi.memory.merge.fraction)  may not have been configured correctly to let Hudi use memory for merging. With the current settings you have, Hudi has no memory to use for Merge process and is resorting to disk based merging which will be slow but would progress without OOM. You would need to check your configs. 
The logic for calculating memory is in :
https://github.com/apache/incubator-hudi/blob/acd74129cd97f24c0dde9bf032a4048f2ce27b5f/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java#L117

Balaji.V


    On Friday, May 31, 2019, 2:53:56 AM PDT, Максим Радионов <ra...@gmail.com> wrote:  
 
 Hi,

I think part of the update process is work slow and I'm doing something
wrong. Example work MOR at -
https://drive.google.com/open?id=17YP_V5k-g3Rp6-jaWaTWvKUSBTOPHg4g
as it seems to me a long time trying to update the existing records, and I
don't understand why it takes most of the time to work  "
BoundedInMemoryQueue" (more than 1.5 hours to overwrite 15 GB and 625 files)
I use Spark - version 2.3.0.cloudera3 and try apply 100mln records(15GB)
for snapshot 1 billion records  (1ТБ ~8k files)
I am so appreciate if anyone can help me locate this problem.

Logs, executor

19/05/29 12:47:09 INFO storage.ShuffleBlockFetcherIterator: Started
269 remote fetches in 109 ms

19/05/29 12:47:09 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=mradionov (auth:SIMPLE)]]]

19/05/29 12:47:09 INFO io.HoodieMergeHandle: MaxMemoryPerPartitionMerge => 0

19/05/29 12:47:09 INFO collection.DiskBasedMap: Spilling to file
location /tmp/91e5578f-6e25-476d-8c63-15834c7588f9 in host (1.1.1.1)
with hostname (host)

19/05/29 12:47:23 INFO io.HoodieMergeHandle: Number of entries in
MemoryBasedMap => 0Total size in bytes of MemoryBasedMap => 0Number of
entries in DiskBasedMap => 125849Size of file spilled to disk =>
75107012

19/05/29 12:47:25 INFO io.HoodieMergeHandle: Merging new data into
oldPath /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_2593_20190527161512.parquet,
as newPath /init_1000mln/default/a46c60f1-63bf-4b8d-b24e-6a6bdb36dad9_461_20190529115632.parquet

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO compress.CodecPool: Got brand-new compressor [.gz]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO util.FSUtils: Hadoop Configuration:
fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: ],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1394640145_127,
ugi=(auth:SIMPLE)]]]

19/05/29 12:47:25 INFO queue.BoundedInMemoryExecutor: starting consumer thread

19/05/29 *12:47:25* INFO queue.IteratorBasedQueueProducer: starting to
buffer records

19/05/29 *13:28:44* INFO queue.IteratorBasedQueueProducer: finished
buffering records

19/05/29 13:28:44 INFO queue.BoundedInMemoryExecutor: Queue
Consumption is done; notifying producer threads



Best Regards
Maksim Radionov