You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Selvaraj periyasamy (Jira)" <ji...@apache.org> on 2020/11/05 08:02:00 UTC

[jira] [Created] (HUDI-1372) BoundedInMemoryExecutor takes 6 mins for 1.5 M records per file

Selvaraj periyasamy created HUDI-1372:
-----------------------------------------

             Summary: BoundedInMemoryExecutor takes 6 mins for 1.5 M records per file
                 Key: HUDI-1372
                 URL: https://issues.apache.org/jira/browse/HUDI-1372
             Project: Apache Hudi
          Issue Type: Task
            Reporter: Selvaraj periyasamy
         Attachments: image-2020-11-04-23-54-12-187.png, image-2020-11-04-23-58-00-554.png, image-2020-11-05-00-00-24-066.png

Am using Hudi 0.5.0 .

 

trying to write into COW table which includes 350 columns. HoodieSparkSQLWriter takes more than 6 mins to merge.  I dont see anything spilling to disk. Is there any tuning to improve this one? I have disabled 

option("hoodie.combine.before.upsert","false").

 

 

20/11/05 07:43:37 INFO DefaultSource: Constructing hoodie (as parquet) data source with options :Map(hoodie.datasource.write.insert.drop.duplicates -> false, hoodie.datasource.hive_sync.database -> default, hoodie.parquet.small.file.limit -> 134217728, hoodie.copyonwrite.record.size.estimate -> 160, hoodie.insert.shuffle.parallelism -> 1000, path -> /projects/cdp/data/cdp_reporting/trr_test2, hoodie.datasource.write.precombine.field -> request_id, hoodie.datasource.hive_sync.partition_fields -> , hoodie.datasource.write.payload.class -> com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor, hoodie.parquet.max.file.size -> 268435456, hoodie.datasource.write.streaming.retry.interval.ms -> 2000, hoodie.datasource.hive_sync.table -> unknown, hoodie.datasource.write.streaming.ignore.failed.batch -> true, hoodie.datasource.write.operation -> upsert, hoodie.parquet.compression.codec -> snappy, hoodie.datasource.hive_sync.enable -> false, hoodie.datasource.write.recordkey.field -> request_id, hoodie.datasource.view.type -> read_optimized, hoodie.table.name -> trr2, hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://localhost:10000, hoodie.datasource.write.table.type -> COPY_ON_WRITE, hoodie.memory.merge.max.size -> 2004857600000, hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.cleaner.policy -> KEEP_LATEST_FILE_VERSIONS, hoodie.datasource.hive_sync.username -> hive, hoodie.datasource.write.streaming.retry.count -> 3, hoodie.combine.before.upsert -> false, hoodie.datasource.hive_sync.password -> hive, hoodie.datasource.write.keygenerator.class -> org.apache.hudi.ComplexKeyGenerator, hoodie.keep.max.commits -> 3, hoodie.upsert.shuffle.parallelism -> 1000, hoodie.datasource.hive_sync.assume_date_partitioning -> false, hoodie.cleaner.commits.retained -> 1, hoodie.keep.min.commits -> 2, hoodie.datasource.write.partitionpath.field -> transaction_month, hoodie.datasource.write.commitmeta.key.prefix -> _, hoodie.index.bloom.num_entries -> 1500000)

 

 

Code snippet.

 

val responseDF = trrDF.write.format("org.apache.hudi").
 option("hoodie.insert.shuffle.parallelism","1000").
 option("hoodie.upsert.shuffle.parallelism","1000").
 option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
 option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY,"request_id"). 
 option("hoodie.memory.merge.max.size", "2004857600000").
 option(PARTITIONPATH_FIELD_OPT_KEY,"transaction_month").
 option(KEYGENERATOR_CLASS_OPT_KEY,"org.apache.hudi.ComplexKeyGenerator").
 option(PAYLOAD_CLASS_OPT_KEY,"com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload").
 option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
 option("hoodie.cleaner.commits.retained", 1). 
 option("hoodie.keep.min.commits",2).
 option("hoodie.keep.max.commits",3). 
 option("hoodie.index.bloom.num_entries","1500000").
 option("hoodie.copyonwrite.record.size.estimate","160").
 option("hoodie.parquet.max.file.size",String.valueOf(256*1024*1024)).
 option("hoodie.parquet.small.file.limit",String.valueOf(128*1024*1024)).
 option("hoodie.parquet.compression.codec","snappy").
 *option("hoodie.combine.before.upsert","false").*
 option(RECORDKEY_FIELD_OPT_KEY,"request_id").
 option(TABLE_NAME, "trr2").
 mode(Append).
 save("/projects/cdp/data/cdp_reporting/trr_test2")

 

!image-2020-11-04-23-54-12-187.png!

 

 

tasks corresponding to stage 20:

 

!image-2020-11-04-23-58-00-554.png!

 

Logs from one of the executor.

 

 

20/11/05 06:17:21 INFO TorrentBroadcast: Started reading broadcast variable 20
20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 87.2 KB, free 12.1 GB)
20/11/05 06:17:21 INFO TorrentBroadcast: Reading broadcast variable 20 took 4 ms
20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 239.4 KB, free 12.1 GB)
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 8, fetching them
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@sl73caehenc606.visa.com:33406)
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Got the output locations
20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Getting 1000 non-empty blocks out of 1000 blocks
20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches in 3 ms
20/11/05 06:17:22 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:22 INFO HoodieMergeHandle: MaxMemoryPerPartitionMerge => 2004857600000
20/11/05 06:17:22 INFO DiskBasedMap: Spilling to file location /tmp/19d435cf-e581-4b80-b286-9aa092587c6f in host (10.160.39.146) with hostname (sl73caehdn0708.visa.com)
20/11/05 06:17:22 INFO HoodieRecordSizeEstimator: SizeOfRecord => 2552 SizeOfSchema => 273456
20/11/05 06:17:22 INFO ExternalSpillableMap: Estimated Payload size => 2664
20/11/05 06:17:22 INFO ExternalSpillableMap: New Estimated Payload size => 1688
20/11/05 06:17:37 INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 1476470Total size in bytes of MemoryBasedMap => 2492281440Number of entries in DiskBasedMap => 0Size of file spilled to disk => 0
20/11/05 06:17:37 INFO FileSystemViewManager: Creating View Manager with storage type :MEMORY
20/11/05 06:17:37 INFO FileSystemViewManager: Creating in-memory based Table View
20/11/05 06:17:37 INFO FileSystemViewManager: Creating InMemory based view for basePath /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:37 INFO HoodieTableConfig: Loading dataset properties from /projects/cdp/data/cdp_reporting/trr_test2/.hoodie/hoodie.properties
20/11/05 06:17:37 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading Active commit timeline for /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@7579056
20/11/05 06:17:37 INFO AbstractTableFileSystemView: Building file system view for partition (202010)
20/11/05 06:17:37 INFO AbstractTableFileSystemView: #files found in partition (202010) =27, Time taken =1
20/11/05 06:17:37 INFO HoodieTableFileSystemView: Adding file-groups for partition :202010, #FileGroups=8
20/11/05 06:17:37 INFO AbstractTableFileSystemView: addFilesToView: NumFiles=27, FileGroupsCreationTime=2, StoreTimeTaken=0
20/11/05 06:17:37 INFO AbstractTableFileSystemView: Time to load partition (202010) =4
20/11/05 06:17:37 INFO HoodieMergeHandle: partitionPath:202010, fileId to be merged:3a404978-eaad-4825-b88a-dc24fff0c623-0
20/11/05 06:17:37 INFO HoodieMergeHandle: Merging new data into oldPath /projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_3-24-10680_20201105055955.parquet, as newPath /projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.parquet
20/11/05 06:17:37 INFO HoodieWriteHandle: Creating Marker Path=/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/.temp/20201105061418/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.marker
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:38 INFO CodecPool: Got brand-new compressor [.snappy]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q (auth:SIMPLE)]]]
20/11/05 06:17:42 INFO IteratorBasedQueueProducer: starting to buffer records
20/11/05 06:17:42 INFO BoundedInMemoryExecutor: starting consumer thread
*20/11/05 06:17:42 INFO CodecPool: Got brand-new decompressor [.snappy]*
*20/11/05 06:23:41 INFO IteratorBasedQueueProducer: finished buffering records*
*20/11/05 06:23:41 INFO BoundedInMemoryExecutor: Queue Consumption is done; notifying producer threads*
20/11/05 06:23:48 INFO HoodieMergeHandle: MergeHandle for partitionPath 202010 fileID 3a404978-eaad-4825-b88a-dc24fff0c623-0, took 385963 ms.
20/11/05 06:23:48 INFO MemoryStore: Block rdd_59_4 stored as bytes in memory (estimated size 304.0 B, free 12.1 GB)
20/11/05 06:23:48 INFO Executor: Finished task 4.0 in stage 24.0 (TID 10681). 1010 bytes result sent to driver
20/11/05 06:23:49 INFO CoarseGrainedExecutorBackend: Got assigned task 10686
20/11/05 06:23:49 INFO Executor: Running task 4.0 in stage 30.0 (TID 10686)
20/11/05 06:23:49 INFO TorrentBroadcast: Started reading broadcast variable 21
20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes in memory (estimated size 87.2 KB, free 12.1 GB)
20/11/05 06:23:49 INFO TorrentBroadcast: Reading broadcast variable 21 took 4 ms
20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated size 239.6 KB, free 12.1 GB)
20/11/05 06:23:50 INFO BlockManager: Found block rdd_59_4 locally
20/11/05 06:23:50 INFO Executor: Finished task 4.0 in stage 30.0 (TID 10686). 1103 bytes result sent to driver
20/11/05 06:23:51 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
20/11/05 06:23:51 INFO MemoryStore: MemoryStore cleared
20/11/05 06:23:51 INFO BlockManager: BlockManager stopped
20/11/05 06:23:51 INFO ShutdownHookManager: Shutdown hook called

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)