You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:07:17 UTC

[jira] [Commented] (BEAM-8945) DirectStreamObserver race condition

    [ https://issues.apache.org/jira/browse/BEAM-8945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174576#comment-17174576 ] 

Beam JIRA Bot commented on BEAM-8945:
-------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> DirectStreamObserver race condition
> -----------------------------------
>
>                 Key: BEAM-8945
>                 URL: https://issues.apache.org/jira/browse/BEAM-8945
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-harness
>    Affects Versions: 2.16.0
>            Reporter: Ankur Goenka
>            Priority: P2
>              Labels: stale-P2
>
> The DirectStreamObserver can get into a dead lock if the channel become unhealthy of is not ready. An extended period of unhealthyness should result into failure.
> This is supported by following thread dumps where we see that 1 thread is having on getting the lock on actual stream observer while the remaining worker threads are waiting on the lock on the stream observer.
>  The thread which is having lock on stream observer is probably in the while loop because the outboundObserver is not ready.
>  Their is also 1 thread which is waiting to execute onError which means that the stream observer has become unhealthy and probably never going to get ready.
> 100s of threads are blocked on:
>  
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>  org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> One thread having the lock:
> State: TIMED_WAITING stack: —
>  sun.misc.Unsafe.park(Native Method)
>  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>  java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
>  java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
>  org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>  org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> One thread waiting to execute onError
> State: BLOCKED stack: —
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> cc: [~lcwik] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)