You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "László Bodor (Jira)" <ji...@apache.org> on 2021/01/14 08:13:00 UTC

[jira] [Commented] (HIVE-24626) LLAP: reader threads could be starvated if all IO elevator threads are busy to enqueue to another readers with full queue

    [ https://issues.apache.org/jira/browse/HIVE-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264680#comment-17264680 ] 

László Bodor commented on HIVE-24626:
-------------------------------------

finally, it turned out that query can hang when:
hive.llap.io.threadpool.size <  hive.llap.daemon.num.executors

> LLAP: reader threads could be starvated if all IO elevator threads are busy to enqueue to another readers with full queue
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24626
>                 URL: https://issues.apache.org/jira/browse/HIVE-24626
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: executor_stack_cache_none_12_io_threads.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The root cause is that the readers cannot queue.offer items to full queues, which belong to consumers that are blocked on other consumers. 
> Scenario is like below:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 2 ......          llap       RUNNING      3          2        1        0       0       0
> Map 1                 llap       RUNNING    676          0      119      557       0       0
> Map 3                 llap       RUNNING    108          0       21       87       0      21
> Reducer 4             llap        INITED      1          0        0        1       0       0
> Map 5                 llap        INITED    108          0        0      108       0       0
> Reducer 6             llap        INITED      4          0        0        4       0       0
> Reducer 7             llap        INITED      1          0        0        1       0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
> ----------------------------------------------------------------------------------------------
> {code}
> Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is blocked on nextCvb:
> {code}
> "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 nid=0x147 waiting on condition [0x00007f0ce005d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0de8025e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> 	at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
> 	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
> 	at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
> 	at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
> 	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
> 	at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
> 	at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
> 	at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> while all the elevator threads are blocked here (all of them tries to read for Map1):
> {code}
> "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 nid=0x267 waiting on condition [0x00007f0cd7af8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0e3095c480> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> 	at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
> 	at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
> 	at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
> 	at org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> the problem here is that, as far as I can see, all the elevator threads try to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot progress beyond a certain point, because wait for Map 2's input
> this is because the queues of LlapRecordReader instances of Map1 are full, otherwise LlapRecordReader.enqueueInternal should be able to offer the item from the elevator thread
> In my example:
> Map2's queue limit is: 50000 (Map2: mapjoin source)
> Map1's queue limit is: 6931 (Map1: mapjoin target)
> the limits are calculated according to data characteristics in LlapRecordReader.determineQueueLimit
> in my case, io threads for Map1 reached their queue limit, and cannot offer new items, so holding IO elevator threads, which cannot be used for reading for more important Map2 tasks, so Map1 tasks will eventually hang here:
> {code}
> "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0de90049a8> (a java.util.concurrent.FutureTask)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:204)
> 	at org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
> 	at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> Operator.asyncInitOperations typically contains a dependency on map join input.
> in my example Map2 reads small table, like microstrategy.lu_item, and Map1 reads a larger table microstrategy.order_detail
> this deadlock theoretically can be solved by:
> 1. IO elevator threads (reading for Map1) opt out as they cannot offer new items
> 2. the freed elevator threads can be used for Map2
> 3. Map2 can finish its work, and let Map1 proceed by providing the mapjoin data to it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)