You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Binh Nguyen Van <bi...@gmail.com> on 2023/04/17 07:46:19 UTC

Loosing records when using BigQuery IO Connector

Hi,

I have a job that uses BigQuery IO Connector to write to a BigQuery table.
When I test it with a small number of records (100) it works as expected
but when I tested it with a larger number of records (10000), I don’t see
all of the records written to the output table but only part of it. It
changes from run to run but no more than 1000 records as I have seen so far.

There are WARNING log entries in the job log like this

by client #0 failed with error, operations will be retried.
com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists:
ALREADY_EXISTS: The offset is within stream, expected offset 933,
received 0

And one log entry like this

Finalize of stream [stream-name] finished with row_count: 683

If I sum all the numbers reported in the WARNING message with the one in
the finalize of stream above, I get 10000, which is exactly the number of
input records.

My pipeline uses 1 worker and it looks like this

WriteResult writeResult = inputCollection.apply(
                "Save Events To BigQuery",
                BigQueryIO.<MyEvent>write()
                    .to(options.getTable())
                    .withFormatFunction(TableRowMappers::toRow)
                    .withMethod(Method.STORAGE_WRITE_API)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo());

    writeResult
        .getFailedStorageApiInserts()
        .apply("Log failed inserts", ParDo.of(new PrintFn<>()));

There are no log entries for the failed inserts.

Is there anything wrong with my pipeline code or is it a bug in BigQuery IO
Connector?

Thanks
-Binh

Re: Loosing records when using BigQuery IO Connector

Posted by XQ Hu via user <us...@beam.apache.org>.
Does FILE_LOADS (
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#FILE_LOADS)
work for your case?
For STORAGE_WRITE_API, it has been actively improved. If the latest SDK
still has this issue, I highly recommend you to create a Google Cloud
support ticket.

On Mon, Apr 17, 2023 at 3:47 AM Binh Nguyen Van <bi...@gmail.com> wrote:

> Hi,
>
> I have a job that uses BigQuery IO Connector to write to a BigQuery table.
> When I test it with a small number of records (100) it works as expected
> but when I tested it with a larger number of records (10000), I don’t see
> all of the records written to the output table but only part of it. It
> changes from run to run but no more than 1000 records as I have seen so far.
>
> There are WARNING log entries in the job log like this
>
> by client #0 failed with error, operations will be retried.
> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>
> And one log entry like this
>
> Finalize of stream [stream-name] finished with row_count: 683
>
> If I sum all the numbers reported in the WARNING message with the one in
> the finalize of stream above, I get 10000, which is exactly the number of
> input records.
>
> My pipeline uses 1 worker and it looks like this
>
> WriteResult writeResult = inputCollection.apply(
>                 "Save Events To BigQuery",
>                 BigQueryIO.<MyEvent>write()
>                     .to(options.getTable())
>                     .withFormatFunction(TableRowMappers::toRow)
>                     .withMethod(Method.STORAGE_WRITE_API)
>                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                     .withExtendedErrorInfo());
>
>     writeResult
>         .getFailedStorageApiInserts()
>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>
> There are no log entries for the failed inserts.
>
> Is there anything wrong with my pipeline code or is it a bug in BigQuery
> IO Connector?
>
> Thanks
> -Binh
>

Re: Loosing records when using BigQuery IO Connector

Posted by XQ Hu via user <us...@beam.apache.org>.
https://github.com/apache/beam/issues/26515 tracks this issue. The fix was
merged. Thanks a lot for reporting this issue, Binh!

On Mon, Apr 17, 2023 at 12:58 PM Binh Nguyen Van <bi...@gmail.com> wrote:

> Hi,
>
> I tested with streaming insert and file load, and they all worked as
> expected. But looks like storage API is the new way to go so want to test
> it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.
>
> Thanks
> -Binh
>
>
> On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <us...@beam.apache.org>
> wrote:
>
>> What version of Beam are you using? There are no known data-loss bugs in
>> the connector, however if there has been a regression we would like to
>> address it with high priority.
>>
>> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <bi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a job that uses BigQuery IO Connector to write to a BigQuery
>>> table. When I test it with a small number of records (100) it works as
>>> expected but when I tested it with a larger number of records (10000), I
>>> don’t see all of the records written to the output table but only part of
>>> it. It changes from run to run but no more than 1000 records as I have seen
>>> so far.
>>>
>>> There are WARNING log entries in the job log like this
>>>
>>> by client #0 failed with error, operations will be retried.
>>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>>>
>>> And one log entry like this
>>>
>>> Finalize of stream [stream-name] finished with row_count: 683
>>>
>>> If I sum all the numbers reported in the WARNING message with the one in
>>> the finalize of stream above, I get 10000, which is exactly the number
>>> of input records.
>>>
>>> My pipeline uses 1 worker and it looks like this
>>>
>>> WriteResult writeResult = inputCollection.apply(
>>>                 "Save Events To BigQuery",
>>>                 BigQueryIO.<MyEvent>write()
>>>                     .to(options.getTable())
>>>                     .withFormatFunction(TableRowMappers::toRow)
>>>                     .withMethod(Method.STORAGE_WRITE_API)
>>>                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>                     .withExtendedErrorInfo());
>>>
>>>     writeResult
>>>         .getFailedStorageApiInserts()
>>>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>>>
>>> There are no log entries for the failed inserts.
>>>
>>> Is there anything wrong with my pipeline code or is it a bug in BigQuery
>>> IO Connector?
>>>
>>> Thanks
>>> -Binh
>>>
>>

Re: Loosing records when using BigQuery IO Connector

Posted by Binh Nguyen Van <bi...@gmail.com>.
Hi,

I tested with streaming insert and file load, and they all worked as
expected. But looks like storage API is the new way to go so want to test
it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.

Thanks
-Binh


On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <us...@beam.apache.org>
wrote:

> What version of Beam are you using? There are no known data-loss bugs in
> the connector, however if there has been a regression we would like to
> address it with high priority.
>
> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <bi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a job that uses BigQuery IO Connector to write to a BigQuery
>> table. When I test it with a small number of records (100) it works as
>> expected but when I tested it with a larger number of records (10000), I
>> don’t see all of the records written to the output table but only part of
>> it. It changes from run to run but no more than 1000 records as I have seen
>> so far.
>>
>> There are WARNING log entries in the job log like this
>>
>> by client #0 failed with error, operations will be retried.
>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>>
>> And one log entry like this
>>
>> Finalize of stream [stream-name] finished with row_count: 683
>>
>> If I sum all the numbers reported in the WARNING message with the one in
>> the finalize of stream above, I get 10000, which is exactly the number
>> of input records.
>>
>> My pipeline uses 1 worker and it looks like this
>>
>> WriteResult writeResult = inputCollection.apply(
>>                 "Save Events To BigQuery",
>>                 BigQueryIO.<MyEvent>write()
>>                     .to(options.getTable())
>>                     .withFormatFunction(TableRowMappers::toRow)
>>                     .withMethod(Method.STORAGE_WRITE_API)
>>                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>                     .withExtendedErrorInfo());
>>
>>     writeResult
>>         .getFailedStorageApiInserts()
>>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>>
>> There are no log entries for the failed inserts.
>>
>> Is there anything wrong with my pipeline code or is it a bug in BigQuery
>> IO Connector?
>>
>> Thanks
>> -Binh
>>
>

Re: Loosing records when using BigQuery IO Connector

Posted by Reuven Lax via user <us...@beam.apache.org>.
What version of Beam are you using? There are no known data-loss bugs in
the connector, however if there has been a regression we would like to
address it with high priority.

On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <bi...@gmail.com> wrote:

> Hi,
>
> I have a job that uses BigQuery IO Connector to write to a BigQuery table.
> When I test it with a small number of records (100) it works as expected
> but when I tested it with a larger number of records (10000), I don’t see
> all of the records written to the output table but only part of it. It
> changes from run to run but no more than 1000 records as I have seen so far.
>
> There are WARNING log entries in the job log like this
>
> by client #0 failed with error, operations will be retried.
> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>
> And one log entry like this
>
> Finalize of stream [stream-name] finished with row_count: 683
>
> If I sum all the numbers reported in the WARNING message with the one in
> the finalize of stream above, I get 10000, which is exactly the number of
> input records.
>
> My pipeline uses 1 worker and it looks like this
>
> WriteResult writeResult = inputCollection.apply(
>                 "Save Events To BigQuery",
>                 BigQueryIO.<MyEvent>write()
>                     .to(options.getTable())
>                     .withFormatFunction(TableRowMappers::toRow)
>                     .withMethod(Method.STORAGE_WRITE_API)
>                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                     .withExtendedErrorInfo());
>
>     writeResult
>         .getFailedStorageApiInserts()
>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>
> There are no log entries for the failed inserts.
>
> Is there anything wrong with my pipeline code or is it a bug in BigQuery
> IO Connector?
>
> Thanks
> -Binh
>