You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Frank Huang <fa...@paloaltonetworks.com> on 2022/02/08 18:42:17 UTC

Issues while running with BigQueryIO.Write.Method.STORAGE_WRITE_API

Hello team,

We are testing with STORAGE_WRITE_API to insert data into BigQuery. We've
seen several errors/warnings in our Dataflow pipeline(written in Java). It
might work well in the beginning, but eventually the system lag would be
increasing, it would stop processing any data from PubSub and the unacked
messages piled up.

One common warning is:

Operation ongoing in step
insertTableRowsToBigQuery/StorageApiLoads/StorageApiWriteSharded/Write
Records for at least 03h35m00s without outputting or completing in state
process
  at java.base@11.0.9/jdk.internal.misc.Unsafe.park(Native Method)
  at java.base@11.0.9
/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  at java.base@11.0.9
/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
  at java.base@11.0.9
/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
  at java.base@11.0.9
/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
  at java.base@11.0.9
/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Callback.await(RetryManager.java:153)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Operation.await(RetryManager.java:136)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:256)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:453)
  at
app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)

Other exceptions we've seen:

Got error io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is
closed

Got error io.grpc.StatusRuntimeException: ALREADY_EXIST

PodSandboxStatus of sandbox "..." for pod
"df-...-pipeline-...-harness-qw4j_default(...)" error: rpc error: code =
Unknown desc = Error: No such container

Code sample:

        toBq.apply("insertTableRowsToBigQuery",
                BigQueryIO
                        .writeTableRows()
                        .to(String.format("%s:%s.%s", PROJECT_ID, DATASET,
table))

.withTriggeringFrequency(Duration.standardSeconds(options.getTriggeringFrequency()))

.withNumStorageWriteApiStreams(options.getNumStorageWriteApiStreams())

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));


Thank you
Frank