You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2020/07/23 19:21:01 UTC

[jira] [Commented] (IMPALA-6692) When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster

    [ https://issues.apache.org/jira/browse/IMPALA-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163902#comment-17163902 ] 

ASF subversion and git services commented on IMPALA-6692:
---------------------------------------------------------

Commit 09727a8d5105cc73bbb53e1be9038972b0b65bb3 in impala's branch refs/heads/master from Riza Suminto
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=09727a8 ]

IMPALA-6692: Trigger sort node run before hitting memory limit.

Sorter node works by adding row batches to a sort run. After all
batches are added to current unsorted run or memory limit is hit,
sorter will immediately start the run. If the latter case happens,
sorter will spill the sorted run to disk after sort complete, create
new unsorted run object, and continue to add the next row batches, and
so on.

This algorithm tries to fit as much rows into memory before start
sorting. However, in the case of partitioned sort with large number of
row batches, fitting too much rows into memory will cause the sort to
be slow and block the sorter node for a long time before it can
release some memory and continue accepting the next row batch from
exchange node. One slow sorter node can block exchange node from
sending row batches to other sorter node that is free.

This patch speeds up the decision to start the sort without waiting it
to hit memory limit first by capping the intermediary quicksort run to
lower memory limit, determined by query option 'sort_run_bytes_limit'.
If the total used reservation of quicksort has exceeded
sort_run_bytes_limit, current unsorted_run_ will be wrapped up,
sorted, and then spilled. Thus, overlapping the next sort run with
spill from previous sort run.

To reduce regression for cases where total input size of sort node
might be fully fit into available memory, sort_run_bytes_limit will
not be enforced for the first sort run. However, it will stay limited
by sort_run_bytes_limit if planner estimates hint that spill is
inevitably will happen.

We also add new summary counter 'AddBatchTime' to get summary of how
much time spent in Sorter::AddBatch. Max of 'AddBatchTime' indicate
the longest time spent in Sorter::AddBatch, presumably busy doing
intermediary sort.

Testing:
- Add new e2e test TestQueryFullSort::test_multiple_sort_run_bytes_limits
- Run core tests
- Run data loading of 3 largest TPC-DS facts table of 300GB scale into
  real cluster using 5 backends, and 4GB mem_limit.
  sort_run_bytes_limit is varied between unspecified (not limited) vs
  512 MB. The performance result is summarized in the following table.

+---------------+---------+--------------+-----------------------+-------------------------+
|  Insert table |  #Rows  |      Avg     |       no limit        |      512 MB limit       |
|               |         | SortDataSize +--------+--------------+---------+---------------+
|               |         |   per Node   |  Query |      Max     |  Query  |      Max      |
|               |         |              |  Time  | AddBatchTime |   Time  |  AddBatchTime |
+---------------+---------+--------------+--------+--------------+---------+---------------+
| store_sales   | 864.00M |     15.29 GB | 30m18s |     53s311ms |     20m |       5s634ms |
+---------------+---------+--------------+--------+--------------+---------+---------------+
| catalog_sales | 431.97M |     11.34 GB | 23m24s |     31s212ms |  15m27s |       3s603ms |
+---------------+---------+--------------+--------+--------------+---------+---------------+
| web_sales     | 216.01M |      5.67 GB |  8m16s |     29s250ms |   6m41s |       3s856ms |
+---------------+---------+--------------+--------+--------------+---------+---------------+

Change-Id: I2a0ba7c4bae4f1d300d4d9d7f594f63ced06a240
Reviewed-on: http://gerrit.cloudera.org:8080/15963
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


> When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-6692
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6692
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Backend, Distributed Exec
>    Affects Versions: Impala 2.10.0
>            Reporter: Mostafa Mokhtar
>            Assignee: Riza Suminto
>            Priority: Critical
>              Labels: perf, resource-management
>             Fix For: Impala 4.0
>
>         Attachments: Kudu table insert without KRPC no sort.txt, Kudu table insert without KRPC.txt, kudu_partial_sort_insert_vd1129.foo.com_2.txt, profile-spilling.txt
>
>
> Issue described in this JIRA applies to 
> * Analytical functions
> * Writes to Partitioned Parquet tables
> * Writes to Kudu tables
> When inserting into a Kudu table from Impala the plan is something like HDFS SCAN -> Partition Exchange -> Partial Sort -> Kudu Insert.
> The query initially makes good progress then significantly slows down and very few nodes make progress.
> While the insert is running the query goes through different phases 
> * Phase 1
> ** Scan is reading data fast, sending data through to exchange 
> ** Partial Sort keeps accumulating batches
> ** Network and CPU is busy, life appears to be OK
> * Phase 2
> ** One of the Sort operators reaches its memory limit and stops calling ExchangeNode::GetNext for a while
> ** This creates back pressure against the DataStreamSenders
> ** The Partial Sort doesn't call GetNext until it has finished sorting GBs of data (Partial sort memory is unbounded as of 03/16/2018)
> ** All exchange operators in the cluster eventually get blocked on that Sort operator and can no longer make progress
> ** After a while the Sort is able to accept more batches which temporarily unblocks execution across the cluster
> ** Another sort operator reaches its memory limit and this loop repeats itself
> Below are stacks from one of the blocked hosts
> _Sort node waiting on data from exchange node as it didn't start sorting since the memory limit for the sort wasn't reached_
> {code}
> Thread 90 (Thread 0x7f8d7d233700 (LWP 21625)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x00007fab1422174c in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.205/lib/impala/lib/libstdc++.so.6
> #2  0x0000000000b4d5aa in void std::_V2::condition_variable_any::wait<boost::unique_lock<impala::SpinLock> >(boost::unique_lock<impala::SpinLock>&) ()
> #3  0x0000000000b4ab6a in impala::KrpcDataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #4  0x0000000000b4b0c8 in impala::KrpcDataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #5  0x0000000000dca7c5 in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #6  0x0000000000dcacae in impala::ExchangeNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #7  0x0000000001032ac3 in impala::PartialSortNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #8  0x0000000000ba9c92 in impala::FragmentInstanceState::ExecInternal() ()
> #9  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> #10 0x0000000000b9ab1a in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #11 0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #12 0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #13 0x00000000012d70ba in thread_proxy ()
> #14 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #15 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}
> _DataStreamSender blocked due to back pressure from the DataStreamRecvr on the node which has a Sort that is spilling_
> {code}
> Thread 89 (Thread 0x7fa8f6a15700 (LWP 21626)):
> #0  0x0000003a6f00ba5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000001237e77 in impala::KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<impala::SpinLock>*) ()
> #2  0x0000000001238b8d in impala::KrpcDataStreamSender::Channel::TransmitData(impala::OutboundRowBatch const*) ()
> #3  0x0000000001238ca9 in impala::KrpcDataStreamSender::Channel::SerializeAndSendBatch(impala::RowBatch*) ()
> #4  0x0000000001238d2e in impala::KrpcDataStreamSender::Channel::SendCurrentBatch() ()
> #5  0x000000000123949f in impala::KrpcDataStreamSender::Send(impala::RuntimeState*, impala::RowBatch*) ()
> #6  0x0000000000ba9d47 in impala::FragmentInstanceState::ExecInternal() ()
> #7  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> {code}
> _Scan node blocked due to back pressure from the DataStreamSender_
> {code}
> Thread 68 (Thread 0x7fa929667700 (LWP 21648)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000000dc9c60 in bool impala::BlockingQueue<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >::BlockingPut<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >&&) ()
> #2  0x0000000000dc61e6 in impala::ExecNode::RowBatchQueue::AddBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #3  0x0000000000dd1ca8 in impala::HdfsScanNode::AddMaterializedRowBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #4  0x0000000000e08adb in impala::HdfsParquetScanner::ProcessSplit() ()
> #5  0x0000000000dd219d in impala::HdfsScanNode::ProcessSplit(std::vector<impala::FilterContext, std::allocator<impala::FilterContext> > const&, impala::MemPool*, impala::io::ScanRange*) ()
> #6  0x0000000000dd3a12 in impala::HdfsScanNode::ScannerThread() ()
> #7  0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #8  0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #9  0x00000000012d70ba in thread_proxy ()
> #10 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #11 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org