You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Sailesh Mukil (JIRA)" <ji...@apache.org> on 2017/09/07 22:56:00 UTC
[jira] [Resolved] (IMPALA-5910) Data stream sender timeout causes
query hang when 0 rows sent through channel
[ https://issues.apache.org/jira/browse/IMPALA-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sailesh Mukil resolved IMPALA-5910.
-----------------------------------
Resolution: Duplicate
> Data stream sender timeout causes query hang when 0 rows sent through channel
> -----------------------------------------------------------------------------
>
> Key: IMPALA-5910
> URL: https://issues.apache.org/jira/browse/IMPALA-5910
> Project: IMPALA
> Issue Type: Bug
> Components: Distributed Exec
> Affects Versions: Impala 2.5.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
> Reporter: Tim Armstrong
> Priority: Critical
> Labels: hang
>
> We saw this bug on an internal cluster that was under heavy load and having trouble opening connections. It manifested as a query hang.
> Here are the fragments:
> {code}
> Host Num. instances Num. remaining instances Done Peak mem. consumption Time since last report (ms)
> h40:22000 1 1 false 8192 3369
> h30:22000 2 0 true 112573520 34000630
> h24:22000 2 1 false 96816946 3371
> {code}
> The thread is hung waiting on the exchange:
> {code}
> https://h24:25000/thread-group?group=fragment-execution
> Thread name Id Cumulative User CPU(s) Cumulative Kernel CPU(s) Cumulative IO-wait(s)
> profile-report (finst:641169ef2c0c8ea:8edf87e100000003) 31426 1.92 0.56 0
> exec-finstance (finst:641169ef2c0c8ea:8edf87e100000003) 31420 0.14 0 0
> Pstack output:
> Thread 1054 (Thread 0x7fbe693cd700 (LWP 31420)):
> #0 0x0000003fdd40b5bc in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
> #1 0x0000000000820a73 in boost::condition_variable::wait(boost::unique_lock<boost::mutex>&) ()
> #2 0x0000000000a1cd01 in impala::DataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
> #3 0x0000000000a1d050 in impala::DataStreamRecvr::GetBatch(impala::RowBatch**) ()
> #4 0x0000000000c3247d in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
> #5 0x0000000000c32e61 in impala::ExchangeNode::Open(impala::RuntimeState*) ()
> #6 0x0000000000ccd505 in impala::PartitionedAggregationNode::Open(impala::RuntimeState*) ()
> #7 0x0000000000a50f8d in impala::FragmentInstanceState::Open() ()
> #8 0x0000000000a526fb in impala::FragmentInstanceState::Exec() ()
> #9 0x0000000000a2f158 in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
> #10 0x0000000000bd3a72 in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*) ()
> #11 0x0000000000bd41d4 in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*), boost::_bi::list4<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
> #12 0x0000000000e5fb9a in ?? ()
> {code}
> The plan is
> {code}
> F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
> Per-Host Resources: mem-estimate=240.00MB mem-reservation=34.00MB
> 01:AGGREGATE [STREAMING]
> | group by: c1, c2, concat(CAST(c3 AS STRING), ';', c4)
> | mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
> | tuple-ids=1 row-size=46B cardinality=unavailable
> |
> 00:SCAN HDFS [d.t, RANDOM]
> partitions=1/3000 files=2 size=82.09MB
> predicates: identifier = 'xxxxxxxxxxx'
> stats-rows=unavailable extrapolated-rows=disabled
> table stats: rows=unavailable size=unavailable
> columns missing stats: ....
> parquet dictionary predicates: identifier = 'xxxxxxxxxxxx'
> mem-estimate=112.00MB mem-reservation=0B
> tuple-ids=0 row-size=45B cardinality=unavailable
> {code}
> I looked at the profile and saw that all the scans produced 0 rows.
> I saw in one log that there was a datastream sender timeout
> {code}
> h24-impalad.INFO:I0906 11:55:25.695996 2894 data-stream-mgr.cc:130] Datastream sender timed-out waiting for recvr for fragment instance: 641169ef2c0c8ea:8edf87e100000003 (time-out was: 2m). Increase --datastream_sender_timeout_ms if you see this message frequently.
> {code}
> It looks like all non-hung instances for that query that finished with an OK status, which means that FIS::Exec() returned OK.
> {noformat}
> h24-impalad.INFO:I0906 11:56:41.212296 31421 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000001 #in-flight=613 status=OK
> h30-impalad.INFO:I0906 11:55:26.268643 4850 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000002 #in-flight=137 status=OK
> h30-impalad.INFO:I0906 11:56:41.213317 4848 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000004 #in-flight=121 status=OK
> {noformat}
> Tracing from this OK status to the data stream sender timeout, we get
> * FIS::Exec() should fail with an error from DataStreamSender::Send() or DataStreamSender::FlushFinal() if there's a timeout.
> * For this query, Send() is not applicable because the fragment returned 0 rows.
> * Therefore DataStreamSender::FlushFinal() should have returned an error.
> * This happens when Channel::FlushandSendEos() does not return an error. Therefore none of those flush calls returned an error.
> * FlushAndSendEos() calls DoTransmitDataRpc() as below.
> {code}
> rpc_status_ = DoTransmitDataRpc(&client, params, &res);
> if (!rpc_status_.ok()) {
> return Status(rpc_status_.code(),
> Substitute("TransmitData(eos=true) to $0 failed:\n $1",
> TNetworkAddressToString(address_), rpc_status_.msg().msg()));
> }
> return Status(res.status);
> {code}
> * rpc_status_ was OK because the status message doesn't show up in the logs
> Therefore we have two scenarios:
> # rpc_status_ was OK in spite of the RPC failing
> # res contained a non-ok status in spite of the send timing out.
> The bug appears to stem from the fact that if no rows are sent over the connection, then AddData() is never called, only CloseSender().
> {code}
> void ImpalaServer::TransmitData(
> TTransmitDataResult& return_val, const TTransmitDataParams& params) {
> VLOG_ROW << "TransmitData(): instance_id=" << params.dest_fragment_instance_id
> << " node_id=" << params.dest_node_id
> << " #rows=" << params.row_batch.num_rows
> << " sender_id=" << params.sender_id
> << " eos=" << (params.eos ? "true" : "false");
> // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
> // of having to copy its data
> if (params.row_batch.num_rows > 0) {
> Status status = exec_env_->ThriftStreamMgr()->AddData(
> params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
> params.sender_id);
> status.SetTStatus(&return_val);
> if (!status.ok()) {
> // should we close the channel here as well?
> return;
> }
> }
> if (params.eos) {
> exec_env_->stream_mgr()->CloseSender(
> params.dest_fragment_instance_id, params.dest_node_id,
> params.sender_id).SetTStatus(&return_val);
> }
> }
> {code}
> CloseSender() doesn't return an error if it can't find the receiver, which means that there is never a status propagated back to the caller. That would explain why the timeout was logged but no fragments failed.
> {code}
> Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
> PlanNodeId dest_node_id, int sender_id) {
> VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
> << ", node=" << dest_node_id;
> bool unused;
> shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
> &unused);
> if (recvr.get() != NULL) recvr->RemoveSender(sender_id);
>
> {
> // Remove any closed streams that have been in the cache for more than
> // STREAM_EXPIRATION_TIME_MS.
> lock_guard<mutex> l(lock_);
> ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
> int64_t now = MonotonicMillis();
> int32_t before = closed_stream_cache_.size();
> while (it != closed_stream_expirations_.end() && it->first < now) {
> closed_stream_cache_.erase(it->second);
> closed_stream_expirations_.erase(it++);
> }
> DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
> int32_t after = closed_stream_cache_.size();
> if (before != after) {
> VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
> << ", eviction took: "
> << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
> }
> }
> return Status::OK();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)