You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Jacques (Jira)" <ji...@apache.org> on 2023/05/04 13:57:00 UTC

[jira] [Commented] (HIVE-27078) Bucket Map Join can hang if the source vertex parallelism is changed by reducer autoparallelism

    [ https://issues.apache.org/jira/browse/HIVE-27078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719342#comment-17719342 ] 

Jacques commented on HIVE-27078:
--------------------------------

Any progress on this issue? We basically cannot enable BMJ on our cluster because of this

> Bucket Map Join can hang if the source vertex parallelism is changed by reducer autoparallelism
> -----------------------------------------------------------------------------------------------
>
>                 Key: HIVE-27078
>                 URL: https://issues.apache.org/jira/browse/HIVE-27078
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Priority: Major
>
> Considering this DAG:
> {code}
> |         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
> |         Map 2 <- Map 4 (CUSTOM_EDGE)               |
> |         Map 5 <- Map 1 (CUSTOM_EDGE)               |
> |         Reducer 3 <- Map 2 (SIMPLE_EDGE)   
> {code}
> this can be simplified further, just picked from a customer query, the problematic vertices and edge is:
> {code}
> |         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
> {code}
> Reducer 3 started scheduled with 20 tasks, and later it's decided by auto reducer parallelism that only 4 tasks are needed:
> {code}
> 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: Reducer 3 from 20 to 4
> {code}
> in this case, Map 1 can hang as it still expects 20 inputs:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 4 .......... container     SUCCEEDED     16         16        0        0       0       0
> Map 2 .......... container     SUCCEEDED     48         48        0        0       0       0
> Reducer 3 ...... container     SUCCEEDED      4          4        0        0       0       0
> Map 1            container       RUNNING    192          0       13      179       0       0
> Map 5            container        INITED    241          0        0      241       0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 03/05  [===>>-----------------------] 13%   ELAPSED TIME: 901.18 s
> ----------------------------------------------------------------------------------------------
> {code}
> in logs it's like:
> {code}
> 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] |impl.ShuffleManager|: Reducer_3: numInputs=20, compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304, localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000, bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
> ...receives the input event:
> 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: Routing events from heartbeat response to task, currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 fromEventId=0 nextFromEventId=0
> ...but then it hangs while waiting for further inputs:
> "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 waiting on condition [0x00007f3f737ba000]
>    java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x000000071ad90a00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> 	at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
> 	at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
> 	at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
> 	at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
> 	at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
> 	at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
> 	at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
> 	at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
> 	at org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
> 	at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
> 	at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
> 	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
> 	at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
> 	at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
> 	at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> 	at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> 	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> 	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> 	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
> 	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:750)
> {code}
> we can temporarily (as a quick workaround) disable auto reducer parallelism on a vertex if it's a source of a bucket map join (here: Reducer 3)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)