You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Sailesh Mukil (JIRA)" <ji...@apache.org> on 2017/09/07 22:56:00 UTC

[jira] [Resolved] (IMPALA-5910) Data stream sender timeout causes query hang when 0 rows sent through channel

     [ https://issues.apache.org/jira/browse/IMPALA-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sailesh Mukil resolved IMPALA-5910.
-----------------------------------
    Resolution: Duplicate

> Data stream sender timeout causes query hang when 0 rows sent through channel
> -----------------------------------------------------------------------------
>
>                 Key: IMPALA-5910
>                 URL: https://issues.apache.org/jira/browse/IMPALA-5910
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Distributed Exec
>    Affects Versions: Impala 2.5.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
>            Reporter: Tim Armstrong
>            Priority: Critical
>              Labels: hang
>
> We saw this bug on an internal cluster that was under heavy load and having trouble opening connections. It manifested as a query hang.
> Here are the fragments:
> {code}
> Host	Num. instances	Num. remaining instances	Done	Peak mem. consumption	Time since last report (ms)
> h40:22000	1	1	false	8192	3369
> h30:22000	2	0	true	112573520	34000630
> h24:22000	2	1	false	96816946	3371
> {code}
> The thread is hung waiting on the exchange:
> {code}
> https://h24:25000/thread-group?group=fragment-execution
> Thread name	Id	Cumulative User CPU(s)	Cumulative Kernel CPU(s)	Cumulative IO-wait(s)
> profile-report (finst:641169ef2c0c8ea:8edf87e100000003) 	31426 	1.92 	0.56 	0
> exec-finstance (finst:641169ef2c0c8ea:8edf87e100000003) 	31420 	0.14 	0 	0
> Pstack output:
> Thread 1054 (Thread 0x7fbe693cd700 (LWP 31420)):
> #0  0x0000003fdd40b5bc in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1  0x0000000000820a73 in boost::condition_variable::wait(boost::unique_lock<boost::mutex>&) ()
> #2  0x0000000000a1cd01 in impala::DataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #3  0x0000000000a1d050 in impala::DataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #4  0x0000000000c3247d in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #5  0x0000000000c32e61 in impala::ExchangeNode::Open(impala::RuntimeState*) ()
> #6  0x0000000000ccd505 in impala::PartitionedAggregationNode::Open(impala::RuntimeState*) ()
> #7  0x0000000000a50f8d in impala::FragmentInstanceState::Open() ()
> #8  0x0000000000a526fb in impala::FragmentInstanceState::Exec() ()
> #9  0x0000000000a2f158 in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #10 0x0000000000bd3a72 in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*) ()
> #11 0x0000000000bd41d4 in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*), boost::_bi::list4<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #12 0x0000000000e5fb9a in ?? ()
> {code}
> The plan is
> {code}
> F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
> Per-Host Resources: mem-estimate=240.00MB mem-reservation=34.00MB
> 01:AGGREGATE [STREAMING]
> |  group by: c1, c2, concat(CAST(c3 AS STRING), ';', c4)
> |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
> |  tuple-ids=1 row-size=46B cardinality=unavailable
> |
> 00:SCAN HDFS [d.t, RANDOM]
>    partitions=1/3000 files=2 size=82.09MB
>    predicates: identifier = 'xxxxxxxxxxx'
>    stats-rows=unavailable extrapolated-rows=disabled
>    table stats: rows=unavailable size=unavailable
>    columns missing stats: ....
>    parquet dictionary predicates: identifier = 'xxxxxxxxxxxx'
>    mem-estimate=112.00MB mem-reservation=0B
>    tuple-ids=0 row-size=45B cardinality=unavailable
> {code}
> I looked at the profile and saw that all the scans produced 0 rows.
> I saw in one log that there was a datastream sender timeout
> {code}
> h24-impalad.INFO:I0906 11:55:25.695996  2894 data-stream-mgr.cc:130] Datastream sender timed-out waiting for recvr for fragment instance: 641169ef2c0c8ea:8edf87e100000003 (time-out was: 2m). Increase --datastream_sender_timeout_ms if you see this message frequently.
> {code}
> It looks like all non-hung instances for that query that finished with an OK status, which means that FIS::Exec() returned OK.
> {noformat}
> h24-impalad.INFO:I0906 11:56:41.212296 31421 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000001 #in-flight=613 status=OK
> h30-impalad.INFO:I0906 11:55:26.268643  4850 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000002 #in-flight=137 status=OK
> h30-impalad.INFO:I0906 11:56:41.213317  4848 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000004 #in-flight=121 status=OK
> {noformat}
> Tracing from this OK status to the data stream sender timeout, we get
> * FIS::Exec() should fail with an error from DataStreamSender::Send() or DataStreamSender::FlushFinal() if there's a timeout. 
> * For this query, Send() is not applicable because the fragment returned 0 rows.
> * Therefore DataStreamSender::FlushFinal() should have returned an error.
> * This happens when Channel::FlushandSendEos() does not return an error. Therefore none of those flush calls returned an error.
> * FlushAndSendEos() calls DoTransmitDataRpc() as below. 
> {code}
>   rpc_status_ = DoTransmitDataRpc(&client, params, &res);
>   if (!rpc_status_.ok()) {
>     return Status(rpc_status_.code(),
>        Substitute("TransmitData(eos=true) to $0 failed:\n $1",
>         TNetworkAddressToString(address_), rpc_status_.msg().msg()));
>   }
>   return Status(res.status);
> {code}
> * rpc_status_ was OK because the status message doesn't show up in the logs
> Therefore we have two scenarios:
> # rpc_status_ was OK in spite of the RPC failing
> # res contained a non-ok status in spite of the send timing out.
> The bug appears to stem from the fact that if no rows are sent over the connection, then AddData() is never called, only CloseSender().
> {code}
> void ImpalaServer::TransmitData(
>     TTransmitDataResult& return_val, const TTransmitDataParams& params) {
>   VLOG_ROW << "TransmitData(): instance_id=" << params.dest_fragment_instance_id
>            << " node_id=" << params.dest_node_id
>            << " #rows=" << params.row_batch.num_rows
>            << " sender_id=" << params.sender_id
>            << " eos=" << (params.eos ? "true" : "false");
>   // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
>   // of having to copy its data
>   if (params.row_batch.num_rows > 0) {
>     Status status = exec_env_->ThriftStreamMgr()->AddData(
>         params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
>         params.sender_id);
>     status.SetTStatus(&return_val);
>     if (!status.ok()) {
>       // should we close the channel here as well?
>       return;
>     }
>   }
>   if (params.eos) {
>     exec_env_->stream_mgr()->CloseSender(
>         params.dest_fragment_instance_id, params.dest_node_id,
>         params.sender_id).SetTStatus(&return_val);
>   }
> }
> {code}
> CloseSender() doesn't return an error if it can't find the receiver, which means that there is never a status propagated back to the caller. That would explain why the timeout was logged but no fragments failed.
> {code}
> Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
>     PlanNodeId dest_node_id, int sender_id) {
>   VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
>             << ", node=" << dest_node_id;
>   bool unused;
>   shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
>       &unused);
>   if (recvr.get() != NULL) recvr->RemoveSender(sender_id);
>     
>   {
>     // Remove any closed streams that have been in the cache for more than
>     // STREAM_EXPIRATION_TIME_MS.
>     lock_guard<mutex> l(lock_);
>     ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
>     int64_t now = MonotonicMillis();
>     int32_t before = closed_stream_cache_.size(); 
>     while (it != closed_stream_expirations_.end() && it->first < now) {
>       closed_stream_cache_.erase(it->second);
>       closed_stream_expirations_.erase(it++);
>     }
>     DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
>     int32_t after = closed_stream_cache_.size();
>     if (before != after) {
>       VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
>                  << ", eviction took: "
>                  << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
>     }
>   }
>   return Status::OK();
> }   
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)