You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by XQ Hu via user <us...@beam.apache.org> on 2023/05/03 13:19:42 UTC

Re: Loosing records when using BigQuery IO Connector

https://github.com/apache/beam/issues/26515 tracks this issue. The fix was
merged. Thanks a lot for reporting this issue, Binh!

On Mon, Apr 17, 2023 at 12:58 PM Binh Nguyen Van <bi...@gmail.com> wrote:

> Hi,
>
> I tested with streaming insert and file load, and they all worked as
> expected. But looks like storage API is the new way to go so want to test
> it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.
>
> Thanks
> -Binh
>
>
> On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>> What version of Beam are you using? There are no known data-loss bugs in
>> the connector, however if there has been a regression we would like to
>> address it with high priority.
>>
>> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <bi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a job that uses BigQuery IO Connector to write to a BigQuery
>>> table. When I test it with a small number of records (100) it works as
>>> expected but when I tested it with a larger number of records (10000), I
>>> don’t see all of the records written to the output table but only part of
>>> it. It changes from run to run but no more than 1000 records as I have seen
>>> so far.
>>>
>>> There are WARNING log entries in the job log like this
>>>
>>> by client #0 failed with error, operations will be retried.
>>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>>>
>>> And one log entry like this
>>>
>>> Finalize of stream [stream-name] finished with row_count: 683
>>>
>>> If I sum all the numbers reported in the WARNING message with the one in
>>> the finalize of stream above, I get 10000, which is exactly the number
>>> of input records.
>>>
>>> My pipeline uses 1 worker and it looks like this
>>>
>>> WriteResult writeResult = inputCollection.apply(
>>>                 "Save Events To BigQuery",
>>>                 BigQueryIO.<MyEvent>write()
>>>                     .to(options.getTable())
>>>                     .withFormatFunction(TableRowMappers::toRow)
>>>                     .withMethod(Method.STORAGE_WRITE_API)
>>>                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>                     .withExtendedErrorInfo());
>>>
>>>     writeResult
>>>         .getFailedStorageApiInserts()
>>>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>>>
>>> There are no log entries for the failed inserts.
>>>
>>> Is there anything wrong with my pipeline code or is it a bug in BigQuery
>>> IO Connector?
>>>
>>> Thanks
>>> -Binh
>>>
>>