You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:07:17 UTC
[jira] [Commented] (BEAM-8945) DirectStreamObserver race condition
[ https://issues.apache.org/jira/browse/BEAM-8945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174576#comment-17174576 ]
Beam JIRA Bot commented on BEAM-8945:
-------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> DirectStreamObserver race condition
> -----------------------------------
>
> Key: BEAM-8945
> URL: https://issues.apache.org/jira/browse/BEAM-8945
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-harness
> Affects Versions: 2.16.0
> Reporter: Ankur Goenka
> Priority: P2
> Labels: stale-P2
>
> The DirectStreamObserver can get into a dead lock if the channel become unhealthy of is not ready. An extended period of unhealthyness should result into failure.
> This is supported by following thread dumps where we see that 1 thread is having on getting the lock on actual stream observer while the remaining worker threads are waiting on the lock on the stream observer.
> The thread which is having lock on stream observer is probably in the while loop because the outboundObserver is not ready.
> Their is also 1 thread which is waiting to execute onError which means that the stream observer has become unhealthy and probably never going to get ready.
> 100s of threads are blocked on:
>
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> One thread having the lock:
> State: TIMED_WAITING stack: —
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
> java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown Source)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> One thread waiting to execute onError
> State: BLOCKED stack: —
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> cc: [~lcwik]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)