You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Michael Ho (JIRA)" <ji...@apache.org> on 2019/03/12 16:39:00 UTC

[jira] [Commented] (IMPALA-6692) When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster

    [ https://issues.apache.org/jira/browse/IMPALA-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16790723#comment-16790723 ] 

Michael Ho commented on IMPALA-6692:
------------------------------------

Replying to my own comment: adding spilling to the receiver side may exacerbate the problem if the receiver was slow due to spilling already. So, it seems the choice is narrowed down to spilling on the sender side instead.

> When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-6692
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6692
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Backend, Distributed Exec
>    Affects Versions: Impala 2.10.0
>            Reporter: Mostafa Mokhtar
>            Priority: Critical
>              Labels: perf, resource-management
>         Attachments: Kudu table insert without KRPC no sort.txt, Kudu table insert without KRPC.txt, kudu_partial_sort_insert_vd1129.foo.com_2.txt, profile-spilling.txt
>
>
> Issue described in this JIRA applies to 
> * Analytical functions
> * Writes to Partitioned Parquet tables
> * Writes to Kudu tables
> When inserting into a Kudu table from Impala the plan is something like HDFS SCAN -> Partition Exchange -> Partial Sort -> Kudu Insert.
> The query initially makes good progress then significantly slows down and very few nodes make progress.
> While the insert is running the query goes through different phases 
> * Phase 1
> ** Scan is reading data fast, sending data through to exchange 
> ** Partial Sort keeps accumulating batches
> ** Network and CPU is busy, life appears to be OK
> * Phase 2
> ** One of the Sort operators reaches its memory limit and stops calling ExchangeNode::GetNext for a while
> ** This creates back pressure against the DataStreamSenders
> ** The Partial Sort doesn't call GetNext until it has finished sorting GBs of data (Partial sort memory is unbounded as of 03/16/2018)
> ** All exchange operators in the cluster eventually get blocked on that Sort operator and can no longer make progress
> ** After a while the Sort is able to accept more batches which temporarily unblocks execution across the cluster
> ** Another sort operator reaches its memory limit and this loop repeats itself
> Below are stacks from one of the blocked hosts
> _Sort node waiting on data from exchange node as it didn't start sorting since the memory limit for the sort wasn't reached_
> {code}
> Thread 90 (Thread 0x7f8d7d233700 (LWP 21625)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x00007fab1422174c in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.205/lib/impala/lib/libstdc++.so.6
> #2  0x0000000000b4d5aa in void std::_V2::condition_variable_any::wait<boost::unique_lock<impala::SpinLock> >(boost::unique_lock<impala::SpinLock>&) ()
> #3  0x0000000000b4ab6a in impala::KrpcDataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #4  0x0000000000b4b0c8 in impala::KrpcDataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #5  0x0000000000dca7c5 in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #6  0x0000000000dcacae in impala::ExchangeNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #7  0x0000000001032ac3 in impala::PartialSortNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
> #8  0x0000000000ba9c92 in impala::FragmentInstanceState::ExecInternal() ()
> #9  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> #10 0x0000000000b9ab1a in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #11 0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #12 0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #13 0x00000000012d70ba in thread_proxy ()
> #14 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #15 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}
> _DataStreamSender blocked due to back pressure from the DataStreamRecvr on the node which has a Sort that is spilling_
> {code}
> Thread 89 (Thread 0x7fa8f6a15700 (LWP 21626)):
> #0  0x0000003a6f00ba5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000001237e77 in impala::KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<impala::SpinLock>*) ()
> #2  0x0000000001238b8d in impala::KrpcDataStreamSender::Channel::TransmitData(impala::OutboundRowBatch const*) ()
> #3  0x0000000001238ca9 in impala::KrpcDataStreamSender::Channel::SerializeAndSendBatch(impala::RowBatch*) ()
> #4  0x0000000001238d2e in impala::KrpcDataStreamSender::Channel::SendCurrentBatch() ()
> #5  0x000000000123949f in impala::KrpcDataStreamSender::Send(impala::RuntimeState*, impala::RowBatch*) ()
> #6  0x0000000000ba9d47 in impala::FragmentInstanceState::ExecInternal() ()
> #7  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
> {code}
> _Scan node blocked due to back pressure from the DataStreamSender_
> {code}
> Thread 68 (Thread 0x7fa929667700 (LWP 21648)):
> #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000000dc9c60 in bool impala::BlockingQueue<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >::BlockingPut<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >&&) ()
> #2  0x0000000000dc61e6 in impala::ExecNode::RowBatchQueue::AddBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #3  0x0000000000dd1ca8 in impala::HdfsScanNode::AddMaterializedRowBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
> #4  0x0000000000e08adb in impala::HdfsParquetScanner::ProcessSplit() ()
> #5  0x0000000000dd219d in impala::HdfsScanNode::ProcessSplit(std::vector<impala::FilterContext, std::allocator<impala::FilterContext> > const&, impala::MemPool*, impala::io::ScanRange*) ()
> #6  0x0000000000dd3a12 in impala::HdfsScanNode::ScannerThread() ()
> #7  0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
> #8  0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #9  0x00000000012d70ba in thread_proxy ()
> #10 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
> #11 0x0000003a6ece893d in clone () from /lib64/libc.so.6
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org