You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/01/14 08:11:00 UTC

[jira] [Work logged] (HIVE-24626) LLAP: reader threads could be starvated if all IO elevator threads are busy to enqueue to another readers with full queue

     [ https://issues.apache.org/jira/browse/HIVE-24626?focusedWorklogId=535928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-535928 ]

ASF GitHub Bot logged work on HIVE-24626:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jan/21 08:10
            Start Date: 14/Jan/21 08:10
    Worklog Time Spent: 10m 
      Work Description: abstractdog opened a new pull request #1868:
URL: https://github.com/apache/hive/pull/1868


   ### What changes were proposed in this pull request?
   Llap daemon should enforce the proper number of io threads (>= executors)
   
   ### Why are the changes needed?
   It can cause query hang otherwise.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   On Cloudera Data Warehouse.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 535928)
    Remaining Estimate: 0h
            Time Spent: 10m

> LLAP: reader threads could be starvated if all IO elevator threads are busy to enqueue to another readers with full queue
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24626
>                 URL: https://issues.apache.org/jira/browse/HIVE-24626
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: executor_stack_cache_none_12_io_threads.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The root cause is that the readers cannot queue.offer items to full queues, which belong to consumers that are blocked on other consumers. 
> Scenario is like below:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 2 ......          llap       RUNNING      3          2        1        0       0       0
> Map 1                 llap       RUNNING    676          0      119      557       0       0
> Map 3                 llap       RUNNING    108          0       21       87       0      21
> Reducer 4             llap        INITED      1          0        0        1       0       0
> Map 5                 llap        INITED    108          0        0      108       0       0
> Reducer 6             llap        INITED      4          0        0        4       0       0
> Reducer 7             llap        INITED      1          0        0        1       0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
> ----------------------------------------------------------------------------------------------
> {code}
> Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is blocked on nextCvb:
> {code}
> "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 nid=0x147 waiting on condition [0x00007f0ce005d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0de8025e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> 	at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
> 	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
> 	at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
> 	at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
> 	at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
> 	at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
> 	at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
> 	at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> while all the elevator threads are blocked here (all of them tries to read for Map1):
> {code}
> "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 nid=0x267 waiting on condition [0x00007f0cd7af8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0e3095c480> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> 	at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
> 	at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
> 	at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
> 	at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
> 	at org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
> 	at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
> 	at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> the problem here is that, as far as I can see, all the elevator threads try to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot progress beyond a certain point, because wait for Map 2's input
> this is because the queues of LlapRecordReader instances of Map1 are full, otherwise LlapRecordReader.enqueueInternal should be able to offer the item from the elevator thread
> In my example:
> Map2's queue limit is: 50000 (Map2: mapjoin source)
> Map1's queue limit is: 6931 (Map1: mapjoin target)
> the limits are calculated according to data characteristics in LlapRecordReader.determineQueueLimit
> in my case, io threads for Map1 reached their queue limit, and cannot offer new items, so holding IO elevator threads, which cannot be used for reading for more important Map2 tasks, so Map1 tasks will eventually hang here:
> {code}
> "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00007f0de90049a8> (a java.util.concurrent.FutureTask)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:204)
> 	at org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
> 	at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> Operator.asyncInitOperations typically contains a dependency on map join input.
> in my example Map2 reads small table, like microstrategy.lu_item, and Map1 reads a larger table microstrategy.order_detail
> this deadlock theoretically can be solved by:
> 1. IO elevator threads (reading for Map1) opt out as they cannot offer new items
> 2. the freed elevator threads can be used for Map2
> 3. Map2 can finish its work, and let Map1 proceed by providing the mapjoin data to it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)