You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dennis Waldron (Jira)" <ji...@apache.org> on 2022/03/18 14:19:00 UTC

[jira] [Commented] (BEAM-13954) BigQueryIO Storage Write API: Stream is already closed

    [ https://issues.apache.org/jira/browse/BEAM-13954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508802#comment-17508802 ] 

Dennis Waldron commented on BEAM-13954:
---------------------------------------

I believe I've found the root cause of this.

When an [error|https://github.com/apache/beam/blob/release-2.37.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java#L252] occurs, [invalidateWriteStream()|https://github.com/apache/beam/blob/release-2.37.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java#L205-L212] is called, which closes the stream and sets the streamAppendClient to null. On retry, a StreamAppendClient is retrieved via [getWriteStream()|https://github.com/apache/beam/blob/release-2.37.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java#L182-L199]. As a result of streamAppendClient being null, a client is returned from the APPEND_CLIENTS cache. However, this client was just closed by invalidateWriteStream(). This results in the [FAILED_PRECONDITION|https://github.com/googleapis/java-bigquerystorage/blame/b89273d94caf671488e538bda6a5fc269000bfa5/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java#L290-L296] as attempts are made to append data to a closed write stream.

Essentially, nothing in the code invalidates/removes the closed client from the cache. The slightest error results in permanent blockage of the pipeline.

> BigQueryIO Storage Write API: Stream is already closed
> ------------------------------------------------------
>
>                 Key: BEAM-13954
>                 URL: https://issues.apache.org/jira/browse/BEAM-13954
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.36.0
>         Environment: GCP Dataflow, Java 11
>            Reporter: Dennis Waldron
>            Priority: P2
>
> Running a streaming pipeline on the latest Beam version (2.36.0), reading data from PubSub and writing to BigQuery with the following sink code:
> {code:java}
> BigQueryIO.<Row>write()
>   .to("project.datatset.table")
>   .useBeamSchema()
>   .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
>   .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); {code}
> Worked for several days before the following exception was raised:
> {noformat}
> Error message from worker: java.lang.RuntimeException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:268)
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382)
> Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed
>         com.google.cloud.bigquery.storage.v1.StreamWriter.appendInternal(StreamWriter.java:294)
>         com.google.cloud.bigquery.storage.v1.StreamWriter.append(StreamWriter.java:272)
>         org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.appendRows(BigQueryServicesImpl.java:1243)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$1(StorageApiWriteUnshardedRecords.java:241)
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Operation.run(RetryManager.java:129)
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:245)
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:274)
>         org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382)
>         org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:238)
>         org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:433)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
>         java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         java.base/java.lang.Thread.run(Thread.java:834)
> {noformat}
> After which, the pipeline enters an endless retry loop, and new data is no longer processed. As a result, Streaming Writes cannot be used for production workloads.
> Also reported by another user on the mailing list: [https://lists.apache.org/thread/21st7zpj3qvn5d9jrcmdroyolbf7pyro]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)