You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Quanlong Huang (JIRA)" <ji...@apache.org> on 2019/08/17 14:09:00 UTC

[jira] [Updated] (IMPALA-6692) When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster

     [ https://issues.apache.org/jira/browse/IMPALA-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Quanlong Huang updated IMPALA-6692:
-----------------------------------
    Target Version: Impala 3.4.0  (was: Impala 3.3.0)

> When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-6692
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6692
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Backend, Distributed Exec
>    Affects Versions: Impala 2.10.0
>            Reporter: Mostafa Mokhtar
>            Priority: Critical
>              Labels: perf, resource-management
>         Attachments: Kudu table insert without KRPC no sort.txt, Kudu table insert without KRPC.txt, kudu_partial_sort_insert_vd1129.foo.com_2.txt, profile-spilling.txt
>
>
> Issue described in this JIRA applies to 
> * Analytical functions
> * Writes to Partitioned Parquet tables
> * Writes to Kudu tables
> When inserting into a Kudu table from Impala the plan is something like HDFS SCAN -> Partition Exchange -> Partial Sort -> Kudu Insert.
> The query initially makes good progress then significantly slows down and very few nodes make progress.
> While the insert is running the query goes through different phases 
> * Phase 1
> ** Scan is reading data fast, sending data through to exchange 
> ** Partial Sort keeps accumulating batches
> ** Network and CPU is busy, life appears to be OK
> * Phase 2
> ** One of the Sort operators reaches its memory limit and stops calling ExchangeNode::GetNext for a while
> ** This creates back pressure against the DataStreamSenders
> ** The Partial Sort doesn't call GetNext until it has finished sorting GBs of data (Partial sort memory is unbounded as of 03/16/2018)
> ** All exchange operators in the cluster eventually get blocked on that Sort operator and can no longer make progress
> ** After a while the Sort is able to accept more batches which temporarily unblocks execution across the cluster
> ** Another sort operator reaches its memory limit and this loop repeats itself
> Below are stacks from one of the blocked hosts
> _Sort node waiting on data from exchange node as it didn't start sorting since the memory limit for the sort wasn't reached_
> {code}
> Thread 90 (Thread 0x7f8d7d233700 (LWP 21625)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x00007fab1422174c in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.205/lib/impala/lib/libstdc++.so.6
> #2  0x0000000000b4d5aa in void std::_V2::condition_variable_any::wait<boost::unique_lock<impala::SpinLock> >(boost::unique_lock<impala::SpinLock>&) ()
> #3  0x0000000000b4ab6a in impala::KrpcDataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #4  0x0000000000b4b0c8 in impala::KrpcDataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #5  0x0000000000dca7c5 in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #6  0x0000000000dcacae in impala::ExchangeNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #7  0x0000000001032ac3 in impala::PartialSortNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #8  0x0000000000ba9c92 in impala::FragmentInstanceState::ExecInternal() ()
> #9  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> #10 0x0000000000b9ab1a in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #11 0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #12 0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #13 0x00000000012d70ba in thread_proxy ()
> #14 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #15 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}
> _DataStreamSender blocked due to back pressure from the DataStreamRecvr on the node which has a Sort that is spilling_
> {code}
> Thread 89 (Thread 0x7fa8f6a15700 (LWP 21626)):
> #0  0x0000003a6f00ba5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000001237e77 in impala::KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<impala::SpinLock>*) ()
> #2  0x0000000001238b8d in impala::KrpcDataStreamSender::Channel::TransmitData(impala::OutboundRowBatch const*) ()
> #3  0x0000000001238ca9 in impala::KrpcDataStreamSender::Channel::SerializeAndSendBatch(impala::RowBatch*) ()
> #4  0x0000000001238d2e in impala::KrpcDataStreamSender::Channel::SendCurrentBatch() ()
> #5  0x000000000123949f in impala::KrpcDataStreamSender::Send(impala::RuntimeState*, impala::RowBatch*) ()
> #6  0x0000000000ba9d47 in impala::FragmentInstanceState::ExecInternal() ()
> #7  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> {code}
> _Scan node blocked due to back pressure from the DataStreamSender_
> {code}
> Thread 68 (Thread 0x7fa929667700 (LWP 21648)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000000dc9c60 in bool impala::BlockingQueue<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >::BlockingPut<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >&&) ()
> #2  0x0000000000dc61e6 in impala::ExecNode::RowBatchQueue::AddBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #3  0x0000000000dd1ca8 in impala::HdfsScanNode::AddMaterializedRowBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #4  0x0000000000e08adb in impala::HdfsParquetScanner::ProcessSplit() ()
> #5  0x0000000000dd219d in impala::HdfsScanNode::ProcessSplit(std::vector<impala::FilterContext, std::allocator<impala::FilterContext> > const&, impala::MemPool*, impala::io::ScanRange*) ()
> #6  0x0000000000dd3a12 in impala::HdfsScanNode::ScannerThread() ()
> #7  0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #8  0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #9  0x00000000012d70ba in thread_proxy ()
> #10 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #11 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org