You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Yida Wu (Code Review)" <ge...@cloudera.org> on 2023/05/11 17:45:23 UTC

[Impala-ASF-CR] IMPALA-11064 Optimizing Temporary File Structure for Batch Reading

Yida Wu has uploaded a new patch set (#11). ( http://gerrit.cloudera.org:8080/18219 )

Change subject: IMPALA-11064 Optimizing Temporary File Structure for Batch Reading
......................................................................

IMPALA-11064 Optimizing Temporary File Structure for Batch Reading

This patch optimizes the structure of temporary files to improve the
batch reading performance, which is a follow up of IMPALA-10791.

With the patch, there will be two types of structures, one is the
original, allocating the space for a new page from the last file
allocated, when the file is full, we will create a new file and
allocate the space from it.

The other is the new structure, which contains multiple blocks, the
data in each block belongs to the same spill id, once a block is
full, we firstly try to allocate a block in the same file, if the
file is full, we will try to allocate the block from a new file.

The new structure benefits the batch reading by gathering the data
with the same spill id (normally the same partitioned hash join
node) in the same block, therefore benefits the case when reading
sequentially on the node from the remote filesystem.

To use the new structure more efficiently, we also have several
features.

1. The batch reading is only for partitioned hash join node.
Because the way to pin the data back to the memory is sequential
for partitioned hash join nodes, this limitation would save the
memory usage. On the other hand, for the data spilled by other
nodes, like the grouping aggregation nodes remain using reading
by page, because the reads from these nodes could be quite random.

2. Prefetch the block to be read.
When pinning a page from a file using batch reading, we will try
to prefetch a block ahead of the current read block (step number is
configurable). Since we limit the batch reading for sequential
reads only, a prefetch for the block can accelerate the
reading rate.

3. Auto file uploader.
The way to spill the data to different blocks by the spill id
instead of keeping writing to the end of the last file could result
in more half-written files and consume more local disk buffer.
Therefore, to deal with this issue, the auto file uploaders is
to help to upload the files within a timeout period after creation
or the last access.

New start option:
'remote_batch_read_block_size_level'
Default value of the option is 3, which stands for the maximum block
size is 2^3=8MB.

'remote_batch_read_tmp_file_size'
The option can specify a different file size for batch reading, if
set to 0, will use remote_tmp_file_size as the file size.
By default, use 16MB for the files with batch read, because in the
tests, small files for batch read seem to have higher performance.

New query options:
'remote_batch_read_prefetch_step'
The option specifies the step number for prefetch. For example, if
step is 1, and we are trying to read the data from a block with
spill id 1 and sequence number 0, then we will try to prefetch
a block with spill id 1 and sequence number 1. Instead, if step
is 2, then a block with spill id 1 and sequence number 2 will be
prefetched. If the last sequence number is met for the block, then
we will try to prefetch a block with spill id 0 and sequence number
0. If there is no such block, nothing will be done.
The default value is 1.

'base_spill_id_level'
The option is used to generate the spill id generated within certain
range. For example, if the value is 6, we will generate a random
number within 2^6=64, then left shift 16bits, which is 64 << 16, to
be the base spill id for the partitioned hash join builder, then
assign the spill id, which is base spill id + partition id, to
the specific partitioned hash join node.
The purpose of this is to reduce the number of temporary files that
could be created by too many spill ids at the same time, and too
many files may easily use up all the local disk buffer then may
slow down not only current query but also other queries spilling.
The default value is 6. Using -1 can disable the feature, which is
using the pointer address of the builder as the base spill id.

'remote_batch_buffer_file_limit'
Limit the number of local buffer file for batch reading can be used
by the current query. The purpose is similar to
'base_spill_id_level', to limit the number of files that is used
by batch reading, because it may block the process of spilling
from non-partitioned hash join node.
The default value is 0, which means no limitation from the option,
and use the global limitation which is half of the number of the
local buffer.

'auto_upload_timeout_s'
The upload timeout limit for the uploader. If the file is created
after the timeout period and no access to the file, file uploader
will force to upload the file to the remote filesystem.
Only works for the files for batch reading.

Performance evaluation:
Test in Cloudera CDW with two execution nodes, ran 3 times for each
query, and took the average for the result.
-- TPCDS 3000(3T)
-- 16GB buffer pool limit(memory)
-- 10GB local disk buffer
-- 8GB read memory buffer
-- file size: 16MB(batch read files)/256MB(normal)
-- block size: 8MB
Query   NonBatch(s)     Batch(s)        Improvement(%)
Q4        3027            2462           18.66
Q64       944.67          710            24.84
Q78       1483            1402           5.46

Above evaluation tests on the several queries containing the
partitioned hash join nodes. It shows improvements using batch
reading in these queries. The improvement difference among the
queries could be related to the spilled data amount for total or
each partition and the partition numbers. Tuning the parameters
could heavily affect the results, also because the queries are
quite complicated, the spilling is not the only factor for the
performance, need more investigations on the profiles for
reasoning.

Testing:
Ran exhaustive tests.
Added testcases in TmpFileMgrTest.
Added ee test test_scratch_dirs_batch_reading.

Change-Id: If913785cac9e2dafa20013b6600c87fcaf3e2018
---
M be/src/exec/partitioned-hash-join-builder.cc
M be/src/exec/partitioned-hash-join-builder.h
M be/src/exec/partitioned-hash-join-node.cc
M be/src/exec/partitioned-hash-join-node.h
M be/src/runtime/buffered-tuple-stream.cc
M be/src/runtime/buffered-tuple-stream.h
M be/src/runtime/bufferpool/buffer-pool-internal.h
M be/src/runtime/bufferpool/buffer-pool.cc
M be/src/runtime/bufferpool/buffer-pool.h
M be/src/runtime/io/disk-file.cc
M be/src/runtime/io/disk-file.h
M be/src/runtime/io/disk-io-mgr-test.cc
M be/src/runtime/io/disk-io-mgr.cc
M be/src/runtime/io/file-writer.h
M be/src/runtime/io/local-file-writer.cc
M be/src/runtime/io/local-file-writer.h
M be/src/runtime/io/request-context.cc
M be/src/runtime/io/request-context.h
M be/src/runtime/io/scan-range.cc
M be/src/runtime/query-state.cc
M be/src/runtime/tmp-file-mgr-internal.h
M be/src/runtime/tmp-file-mgr-test.cc
M be/src/runtime/tmp-file-mgr.cc
M be/src/runtime/tmp-file-mgr.h
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Query.thrift
M tests/custom_cluster/test_scratch_disk.py
29 files changed, 2,256 insertions(+), 509 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/19/18219/11
-- 
To view, visit http://gerrit.cloudera.org:8080/18219
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If913785cac9e2dafa20013b6600c87fcaf3e2018
Gerrit-Change-Number: 18219
Gerrit-PatchSet: 11
Gerrit-Owner: Yida Wu <wy...@gmail.com>
Gerrit-Reviewer: Abhishek Rawat <ar...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Smith <mi...@cloudera.com>
Gerrit-Reviewer: Qifan Chen <qf...@hotmail.com>
Gerrit-Reviewer: Yida Wu <wy...@gmail.com>