You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Joe McDonnell (Jira)" <ji...@apache.org> on 2023/06/22 23:57:00 UTC

[jira] [Commented] (IMPALA-12233) Partitioned hash join with a limit can hang when using mt_dop>0

    [ https://issues.apache.org/jira/browse/IMPALA-12233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736311#comment-17736311 ] 

Joe McDonnell commented on IMPALA-12233:
----------------------------------------

A few more pieces:
 # The partitioned hash join nodes don't have a limit specified. The limit is on the streaming aggregation node above it. So, when the streaming aggregation node hits the limit, it calls Close() on its children. The Close() codepath doesn't have any interaction with the cyclic barrier (which is the bug)
 # In the reproducing SQL, exactly one of the fragment instances produces rows. tpcds.store_sales is partitioned by ss_sold_date_sk and the SQL is selecting rows that come from the max partition. It turns out that only one fragment is processing rows from that max partition, so all other fragments hit end-of-stream without producing rows. This is why they don't hit the limit in their streaming aggregation node.

In terms of fixes, it sounds like the intention was for all fragment instances to move in lock step whether they have work to do or not. Here is a chunk of partitioned-hash-join-builder.h:
{noformat}
/// The algorithm (specifically the HashJoinState state machine) is executed in lockstep
/// across all probe threads with each probe thread working on the same set of partitions
/// at the same time. A CyclicBarrier, 'probe_barrier_', is used for synchronization.
/// At each state transition where the builder state needs to be mutated, all probe
/// threads must arrive at the barrier before proceeding. The state transition is executed
/// serially by a single thread before all threads proceed. All probe threads go through
/// the same state transitions in lockstep, even if they have no work to do. E.g. if a
/// probe thread has zero rows remaining in its spilled partitions, it still needs to
/// wait for the other probe threads.{noformat}
So, one fix is to have Close() walk through the appropriate transitions, synchronizing with the other fragment instances. There are a series of transitions, and we need to know which transitions have already happened, so this may take some careful work.

> Partitioned hash join with a limit can hang when using mt_dop>0
> ---------------------------------------------------------------
>
>                 Key: IMPALA-12233
>                 URL: https://issues.apache.org/jira/browse/IMPALA-12233
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Backend
>    Affects Versions: Impala 4.3.0
>            Reporter: Joe McDonnell
>            Priority: Blocker
>
> After encountering a hung query on an Impala cluster, we were able to reproduce it in the Impala developer environment with these steps:
> {noformat}
> use tpcds;
> set mt_dop=2;
> select ss_cdemo_sk from store_sales where ss_sold_date_sk = (select max(ss_sold_date_sk) from store_sales) group by ss_cdemo_sk limit 1;{noformat}
> The problem reproduces with limit values up to 183, then at limit 184 and higher it doesn't reproduce.
> Taking stack traces show a thread waiting for a cyclic barrier:
> {noformat}
>  0  libpthread.so.0!__pthread_cond_wait + 0x216
>  1  impalad!impala::CyclicBarrier::Wait<impala::PhjBuilder::DoneProbingHashPartitions(const int64_t*, impala::BufferPool::ClientHandle*, impala::RuntimeProfile*, std::deque<std::unique_ptr<impala::PhjBuilderPartition> >*, impala::RowBatch*)::<lambda()> > [condition-variable.h : 49 + 0xc]
>  2  impalad!impala::PhjBuilder::DoneProbingHashPartitions(long const*, impala::BufferPool::ClientHandle*, impala::RuntimeProfile*, std::deque<std::unique_ptr<impala::PhjBuilderPartition, std::default_delete<impala::PhjBuilderPartition> >, std::allocator<std::unique_ptr<impala::PhjBuilderPartition, std::default_delete<impala::PhjBuilderPartition> > > >*, impala::RowBatch*) [partitioned-hash-join-builder.cc : 766 + 0x25]
>  3  impalad!impala::PartitionedHashJoinNode::DoneProbing(impala::RuntimeState*, impala::RowBatch*) [partitioned-hash-join-node.cc : 1189 + 0x28]
>  4  impalad!impala::PartitionedHashJoinNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) [partitioned-hash-join-node.cc : 599 + 0x15]
>  5  impalad!impala::StreamingAggregationNode::GetRowsStreaming(impala::RuntimeState*, impala::RowBatch*) [streaming-aggregation-node.cc : 115 + 0x14]
>  6  impalad!impala::StreamingAggregationNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) [streaming-aggregation-node.cc : 77 + 0x15]
>  7  impalad!impala::FragmentInstanceState::ExecInternal() [fragment-instance-state.cc : 446 + 0x15]
>  8  impalad!impala::FragmentInstanceState::Exec() [fragment-instance-state.cc : 104 + 0xf]
>  9  impalad!impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) [query-state.cc : 956 + 0xf]{noformat}
> Adding some debug logging around locations that go through that cyclic barrier, we see one Impalad where it is expecting two threads and only one arrives:
> {noformat}
> I0621 18:28:19.926551 210363 partitioned-hash-join-builder.cc:766] 2a4787b28425372d:ac6bd96200000004] DoneProbingHashPartitions: num_probe_threads_=2
> I0621 18:28:19.927855 210362 streaming-aggregation-node.cc:136] 2a4787b28425372d:ac6bd96200000003] the number of rows (93) returned from the streaming aggregation node has exceeded the limit of 1
> I0621 18:28:19.928887 210362 query-state.cc:958] 2a4787b28425372d:ac6bd96200000003] Instance completed. instance_id=2a4787b28425372d:ac6bd96200000003 #in-flight=4 status=OK{noformat}
> Other instances that don't have a stuck thread see both threads arrive:
> {noformat}
> I0621 18:28:19.926223 210358 partitioned-hash-join-builder.cc:766] 2a4787b28425372d:ac6bd96200000005] DoneProbingHashPartitions: num_probe_threads_=2
> I0621 18:28:19.926326 210359 partitioned-hash-join-builder.cc:766] 2a4787b28425372d:ac6bd96200000006] DoneProbingHashPartitions: num_probe_threads_=2{noformat}
> So, there must be a codepath that skips going through the cyclic barrier.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org