You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2020/04/23 22:53:00 UTC

[jira] [Updated] (BEAM-9192) BigQuery IO on Dataflow runner fails (java.lang.ClassCastException) with --experiment=beam_fn_api

     [ https://issues.apache.org/jira/browse/BEAM-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-9192:
----------------------------------
    Status: Open  (was: Triage Needed)

> BigQuery IO on Dataflow runner fails (java.lang.ClassCastException) with --experiment=beam_fn_api
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9192
>                 URL: https://issues.apache.org/jira/browse/BEAM-9192
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0
>            Reporter: Bo Shi
>            Priority: Major
>
> {noformat}
> python repro.py \
>   --project=CHANGEME \
>   --runner=DataflowRunner \
>   --temp_location=gs://change-me/bshi/tmp \
>   --staging_location=gs://change-me/bshi/stg \
>   --experiment=beam_fn_api
>   --save_main_function
> {noformat}
> The same repro code works with --runner=Direct.  On Dataflow, the error is
> {noformat}
> java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B cannot be cast to org.apache.beam.sdk.values.KV
> 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> 	at org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:87)
> 	at org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService$DeferredInboundDataClient.awaitCompletion(BeamFnDataGrpcService.java:134)
> 	at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.finish(RemoteGrpcPortReadOperation.java:83)
> 	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> 	at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
> 	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
> 	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
> 	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
> 	at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195)
> 	at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123)
> 	Suppressed: java.lang.IllegalStateException: Already closed.
> 		at org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:93)
> 		at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:220)
> 		at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:91)
> 		... 6 more
> Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.beam.sdk.values.KV
> 	at org.apache.beam.runners.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:72)
> 	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> 	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> 	at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.consumeOutput(RemoteGrpcPortReadOperation.java:103)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:78)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:138)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> {noformat}
>  
> {code:python}
> import logging
> import sys
> import apache_beam
> import apache_beam.io.gcp.bigquery_file_loads
> def main():
>     logging.getLogger().setLevel(logging.DEBUG)
>     options = apache_beam.options.pipeline_options.PipelineOptions(flags=sys.argv)
>     with apache_beam.Pipeline(options=options) as p:
>         (
>             p
>             | apache_beam.Create(
>                 [
>                     {"some_str": "hello", "some_int": 1,},
>                     {"some_str": "world", "some_int": 2,},
>                 ]
>             )
>             | apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(
>                 options.view_as(
>                     apache_beam.options.pipeline_options.GoogleCloudOptions
>                 ).project
>                 + ":bo_test.flow",
>                 create_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
>                 write_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.WRITE_TRUNCATE,
>                 schema={
>                     "fields": [
>                         {"name": "some_str", "type": "STRING"},
>                         {"name": "some_int", "type": "INTEGER"},
>                     ]
>                 },
>             )
>         )
> if __name__ == "__main__":
>     main()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)