You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ankur Goenka (Jira)" <ji...@apache.org> on 2020/01/08 00:58:00 UTC

[jira] [Updated] (BEAM-8945) DirectStreamObserver race condition

     [ https://issues.apache.org/jira/browse/BEAM-8945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ankur Goenka updated BEAM-8945:
-------------------------------
    Description: 
The DirectStreamObserver can get into a dead lock if the channel become unhealthy of is not ready. An extended period of unhealthyness should result into failure.

This is supported by following thread dumps where we see that 1 thread is having on getting the lock on actual stream observer while the remaining worker threads are waiting on the lock on the stream observer.
 The thread which is having lock on stream observer is probably in the while loop because the outboundObserver is not ready.
 Their is also 1 thread which is waiting to execute onError which means that the stream observer has become unhealthy and probably never going to get ready.

100s of threads are blocked on:

 
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
 org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

One thread having the lock:

State: TIMED_WAITING stack: —
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
 java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
 java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
 org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
 org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

One thread waiting to execute onError

State: BLOCKED stack: —
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

cc: [~lcwik] 

  was:
The DirectStreamObserver can bet into a dead lock if the channel become unhealthy of is not ready. An extended period of unhealthyness should result into failure.

This is supported by following thread dumps where we see that 1 thread is having on getting the lock on actual stream observer while the remaining worker threads are waiting on the lock on the stream observer.
 The thread which is having lock on stream observer is probably in the while loop because the outboundObserver is not ready.
 Their is also 1 thread which is waiting to execute onError which means that the stream observer has become unhealthy and probably never going to get ready.

100s of threads are blocked on:

 
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
 org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

One thread having the lock:

State: TIMED_WAITING stack: ---
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
 java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
 java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
 org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
 org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

One thread waiting to execute onError

State: BLOCKED stack: ---
 org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
 org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)

 

 

cc: [~lcwik] 


> DirectStreamObserver race condition
> -----------------------------------
>
>                 Key: BEAM-8945
>                 URL: https://issues.apache.org/jira/browse/BEAM-8945
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-harness
>    Affects Versions: 2.16.0
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>
> The DirectStreamObserver can get into a dead lock if the channel become unhealthy of is not ready. An extended period of unhealthyness should result into failure.
> This is supported by following thread dumps where we see that 1 thread is having on getting the lock on actual stream observer while the remaining worker threads are waiting on the lock on the stream observer.
>  The thread which is having lock on stream observer is probably in the while loop because the outboundObserver is not ready.
>  Their is also 1 thread which is waiting to execute onError which means that the stream observer has become unhealthy and probably never going to get ready.
> 100s of threads are blocked on:
>  
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>  org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> One thread having the lock:
> State: TIMED_WAITING stack: —
>  sun.misc.Unsafe.park(Native Method)
>  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>  java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
>  java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
>  org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>  org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>  org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> One thread waiting to execute onError
> State: BLOCKED stack: —
>  org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
>  org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
>  
>  
> cc: [~lcwik] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)