You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2019/02/13 14:27:04 UTC

Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

Hello,

I have a BigQuery table which ingests a Protobuf stream from Kafka with a
Beam pipeline. The Protobuf has a `log Map<String,String>` column which
translates to a field "log" of type RECORD with unknown fields in BigQuery.

So I scanned my whole stream to know which schema fields to expect and
created an empty daily-partitioned table in BigQuery with correct fields
for this RECORD type.

Now someone is pushing a new field into this RECORD and the import fails as
the schema is not matching anymore. My pipeline is configured to use
FILE_LOADS and so these do fail - but the Flink pipeline just continues and
does not throw any error.

My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions are:

1. How can I make the pipeline fail hard in this situation?
2. How can I prevent this from happening? If I create the table with the
correct schema in the beginning the table schema seems to be overwritten
and autodetected again (without the added field). This is causing the load
to fail. If I alter the schema and add the field manually when the pipeline
is running it fixes it, but then I already have a dozen of failed imports.
The main issue seems to be that I have a Protobuf schema which only defines
a Map<String,String> type, that is not specific enough to create the full
matching schema from it.

Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Actually the number of retries is 1000 by default, so it might have not
reached that number yet. I will have to test that again.
https://github.com/apache/beam/blob/38daf8c45b14a94665939b562ab947dc72ad8f8f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1901

On Thu, Feb 14, 2019 at 1:05 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Thank you Jeff,
>
> I think 1 is a bug and I am planning to report it in the bugtracker
> regarding 2 I have now a fixed TableSchema supplied to this pipeline with
> the expected fields, and I am ignoring unknown fields.
>
> Best,
> Tobi
>
> On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> To question 1, I also would have expected the pipeline to fail in the
>> case of files failing to load; I'm not sure why it's not. I thought the
>> BigQuery API returns a 400 level response code in the case of files failing
>> and that would bubble up to a pipeline execution error, but I haven't dug
>> through the code to verify that.
>>
>> As to question 2, the lack of a native map type in BigQuery means you're
>> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
>> the map type as an array of (key, value) structs [0]. You could modify your
>> payload to match that same sort of structure, which would transparently
>> handle future additions of new keys, but may not be as convenient for
>> querying.
>>
>> Otherwise, if you want to model the "log" structure as a struct in
>> BigQuery, I think you'd need to provide the schemas as a side input to
>> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
>> keys in the payload and output a view of the schema.
>>
>> Another option would be to set ignoreUnknownValues() which would drop
>> values for any fields not existing in the BQ table. Dropping those values
>> may or may not be acceptable for your use case.
>>
>> [0]
>> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>>
>> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Hello,
>>>
>>> I have a BigQuery table which ingests a Protobuf stream from Kafka with
>>> a Beam pipeline. The Protobuf has a `log Map<String,String>` column which
>>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>>
>>> So I scanned my whole stream to know which schema fields to expect and
>>> created an empty daily-partitioned table in BigQuery with correct fields
>>> for this RECORD type.
>>>
>>> Now someone is pushing a new field into this RECORD and the import fails
>>> as the schema is not matching anymore. My pipeline is configured to use
>>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>>> does not throw any error.
>>>
>>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>>> are:
>>>
>>> 1. How can I make the pipeline fail hard in this situation?
>>> 2. How can I prevent this from happening? If I create the table with the
>>> correct schema in the beginning the table schema seems to be overwritten
>>> and autodetected again (without the added field). This is causing the load
>>> to fail. If I alter the schema and add the field manually when the pipeline
>>> is running it fixes it, but then I already have a dozen of failed imports.
>>> The main issue seems to be that I have a Protobuf schema which only
>>> defines a Map<String,String> type, that is not specific enough to create
>>> the full matching schema from it.
>>>
>>>
>>>

Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Thank you Jeff,

I think 1 is a bug and I am planning to report it in the bugtracker
regarding 2 I have now a fixed TableSchema supplied to this pipeline with
the expected fields, and I am ignoring unknown fields.

Best,
Tobi

On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas <jk...@mozilla.com> wrote:

> To question 1, I also would have expected the pipeline to fail in the case
> of files failing to load; I'm not sure why it's not. I thought the BigQuery
> API returns a 400 level response code in the case of files failing and that
> would bubble up to a pipeline execution error, but I haven't dug through
> the code to verify that.
>
> As to question 2, the lack of a native map type in BigQuery means you're
> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
> the map type as an array of (key, value) structs [0]. You could modify your
> payload to match that same sort of structure, which would transparently
> handle future additions of new keys, but may not be as convenient for
> querying.
>
> Otherwise, if you want to model the "log" structure as a struct in
> BigQuery, I think you'd need to provide the schemas as a side input to
> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
> keys in the payload and output a view of the schema.
>
> Another option would be to set ignoreUnknownValues() which would drop
> values for any fields not existing in the BQ table. Dropping those values
> may or may not be acceptable for your use case.
>
> [0]
> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>
> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> I have a BigQuery table which ingests a Protobuf stream from Kafka with a
>> Beam pipeline. The Protobuf has a `log Map<String,String>` column which
>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>
>> So I scanned my whole stream to know which schema fields to expect and
>> created an empty daily-partitioned table in BigQuery with correct fields
>> for this RECORD type.
>>
>> Now someone is pushing a new field into this RECORD and the import fails
>> as the schema is not matching anymore. My pipeline is configured to use
>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>> does not throw any error.
>>
>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>> are:
>>
>> 1. How can I make the pipeline fail hard in this situation?
>> 2. How can I prevent this from happening? If I create the table with the
>> correct schema in the beginning the table schema seems to be overwritten
>> and autodetected again (without the added field). This is causing the load
>> to fail. If I alter the schema and add the field manually when the pipeline
>> is running it fixes it, but then I already have a dozen of failed imports.
>> The main issue seems to be that I have a Protobuf schema which only
>> defines a Map<String,String> type, that is not specific enough to create
>> the full matching schema from it.
>>
>>
>>

Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

Posted by Jeff Klukas <jk...@mozilla.com>.
To question 1, I also would have expected the pipeline to fail in the case
of files failing to load; I'm not sure why it's not. I thought the BigQuery
API returns a 400 level response code in the case of files failing and that
would bubble up to a pipeline execution error, but I haven't dug through
the code to verify that.

As to question 2, the lack of a native map type in BigQuery means you're in
a bit of a tough spot. BigQuery loads from Avro handle this by modeling the
map type as an array of (key, value) structs [0]. You could modify your
payload to match that same sort of structure, which would transparently
handle future additions of new keys, but may not be as convenient for
querying.

Otherwise, if you want to model the "log" structure as a struct in
BigQuery, I think you'd need to provide the schemas as a side input to
BigQueryIO. Perhaps you could have a branch of your pipeline look for new
keys in the payload and output a view of the schema.

Another option would be to set ignoreUnknownValues() which would drop
values for any fields not existing in the BQ table. Dropping those values
may or may not be acceptable for your use case.

[0]
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types

On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> I have a BigQuery table which ingests a Protobuf stream from Kafka with a
> Beam pipeline. The Protobuf has a `log Map<String,String>` column which
> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>
> So I scanned my whole stream to know which schema fields to expect and
> created an empty daily-partitioned table in BigQuery with correct fields
> for this RECORD type.
>
> Now someone is pushing a new field into this RECORD and the import fails
> as the schema is not matching anymore. My pipeline is configured to use
> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
> does not throw any error.
>
> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
> are:
>
> 1. How can I make the pipeline fail hard in this situation?
> 2. How can I prevent this from happening? If I create the table with the
> correct schema in the beginning the table schema seems to be overwritten
> and autodetected again (without the added field). This is causing the load
> to fail. If I alter the schema and add the field manually when the pipeline
> is running it fixes it, but then I already have a dozen of failed imports.
> The main issue seems to be that I have a Protobuf schema which only
> defines a Map<String,String> type, that is not specific enough to create
> the full matching schema from it.
>
>
>