You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Janek Bevendorff (Jira)" <ji...@apache.org> on 2022/01/27 14:29:00 UTC

[jira] [Commented] (BEAM-8980) Running GroupByKeyLoadTest on Portable Flink fails

    [ https://issues.apache.org/jira/browse/BEAM-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483174#comment-17483174 ] 

Janek Bevendorff commented on BEAM-8980:
----------------------------------------

This is definitely an issue and not solved. I am running a Python Beam pipeline on Flink (Portable runner with a Beam JobServer) and every time I use at least 8 concurrent TaskManagers, my job throws some totally unrelated gRPC exception after finishing a global combine. I can hardly reproduce this 1-4 with TaskManagers.

The exception causes the job to either fail or restart itself from scratch only to fail again eventually. I actually don't have recovery options enabled in Flink at the moment, so I am not quite sure what part is being retried and why, but it happens occasionally (but usually fails quickly thereafter).

Especially if the jobs runs for longer periods of time, it can also happen that the last TaskManager stays stuck in RUNNING indefinitely mode long after the actual job has finished.

At the end of the global combine phase, I get the following exception in the TaskManager history:
{code:java}
2022-01-27 14:15:09,260 WARN  org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error cleaning up servers urn: "beam:env:external:v1"payload: "\n\021\n\017localhost:50000"capabilities: "beam:coder:bytes:v1"capabilities: "beam:coder:string_utf8:v1"capabilities: "beam:coder:kv:v1"capabilities: "beam:coder:bool:v1"capabilities: "beam:coder:varint:v1"capabilities: "beam:coder:double:v1"capabilities: "beam:coder:iterable:v1"capabilities: "beam:coder:timer:v1"capabilities: "beam:coder:interval_window:v1"capabilities: "beam:coder:length_prefix:v1"capabilities: "beam:coder:global_window:v1"capabilities: "beam:coder:windowed_value:v1"capabilities: "beam:coder:param_windowed_value:v1"capabilities: "beam:coder:state_backed_iterable:v1"capabilities: "beam:coder:custom_window:v1"capabilities: "beam:coder:row:v1"capabilities: "beam:coder:sharded_key:v1"capabilities: "beam:protocol:progress_reporting:v0"capabilities: "beam:protocol:harness_monitoring_infos:v1"capabilities: "beam:protocol:worker_status:v1"capabilities: "beam:combinefn:packed_python:v1"capabilities: "beam:version:sdk_base:apache/beam_python3.8_sdk:2.35.0"capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"capabilities: "beam:transform:to_string:v1"dependencies {  type_urn: "beam:artifact:type:file:v1"  type_payload: "\n\207\001/tmp/beam-artifact-staging/5b95d60d1d41127a9221f18e9f6e07e3c6c231ab7a10db788b8928cd1afd7c44/1-ref_Environment_default_e-workflow.tar.gz"  role_urn: "beam:artifact:role:staging_to:v1"  role_payload: "\n\017workflow.tar.gz"}
java.lang.IllegalStateException: call already closed    at org.apache.beam.vendor.grpc.v1p36p0.com.google.common.base.Preconditions.checkState(Preconditions.java:508) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:209) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:202) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:380) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.state.GrpcStateService.close(GrpcStateService.java:63) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.sdk.fn.server.GrpcFnServer.close(GrpcFnServer.java:156) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212) ~[blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.close(FlinkExecutableStageFunction.java:373) [blob_p-2bd29893637a50c9eafd5cd5f69967642654531f-6807d27912605370e1e560a6dda61acb:?]    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) [flink-dist_2.12-1.13.0.jar:1.13.0]    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519) [flink-dist_2.12-1.13.0.jar:1.13.0]    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357) [flink-dist_2.12-1.13.0.jar:1.13.0]    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) [flink-dist_2.12-1.13.0.jar:1.13.0]    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-dist_2.12-1.13.0.jar:1.13.0]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] {code}
And this in the Flink JobManager log:
{code:java}
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception    at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status.asRuntimeException(Status.java:526)    at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)    at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)    at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:103)    at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:115)    at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:718)    at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:362)    at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:267)    at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:113)    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)    at java.lang.Thread.run(Thread.java:748)    Suppressed: org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception        at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status.asRuntimeException(Status.java:526)        at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)        at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)        at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.close(BeamFnDataSizeBasedBufferingOutboundObserver.java:97)        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.close(SdkHarnessClient.java:729)        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:492)        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)        at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268)        at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:268)        ... 6 more        Suppressed: java.lang.IllegalStateException: Processing bundle failed, TODO: [BEAM-3962] abort bundle.            at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:516)            ... 9 more
{code}

> Running GroupByKeyLoadTest on Portable Flink fails
> --------------------------------------------------
>
>                 Key: BEAM-8980
>                 URL: https://issues.apache.org/jira/browse/BEAM-8980
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, testing
>            Reporter: Michał Walenia
>            Priority: P2
>
> When running a GBK Load test using Java harness image and JobServer image generated from master, the load test fails with a cryptic exception:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Invalid job state: FAILED.
> 11:45:31 	at org.apache.beam.sdk.loadtests.JobFailure.handleFailure(JobFailure.java:55)
> 11:45:31 	at org.apache.beam.sdk.loadtests.LoadTest.run(LoadTest.java:106)
> 11:45:31 	at org.apache.beam.sdk.loadtests.CombineLoadTest.run(CombineLoadTest.java:66)
> 11:45:31 	at org.apache.beam.sdk.loadtests.CombineLoadTest.main(CombineLoadTest.java:169)
> {code}
>  
> After some investigation, I found a stacktrace of the error:
> {code:java}
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelledorg.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:339) at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98) at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:90) at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:102) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:278) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:201) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Suppressed: org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:339) at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98) at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.close(BeamFnDataSizeBasedBufferingOutboundObserver.java:84) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:298) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:202) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:202) ... 6 more Suppressed: java.lang.IllegalStateException: Processing bundle failed, TODO: [BEAM-3962] abort bundle. at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:320) ... 8 more
> {code}
> It seems that the core issue is an IllegalStateException thrown from SdkHarnessClient.java:320, related to BEAM-3962.
>  It is important to note that the stacktrace above comes from the Flink cluster, not from the Gradle job that was executed.
> The link to Jenkins job is here: [https://builds.apache.org/job/beam_LoadTests_Java_GBK_Flink_Batch_PR/28/console]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)