You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Josh <jo...@gmail.com> on 2017/05/16 12:02:51 UTC

How to skip processing when specific exception is thrown?

Hi all,

I am wondering if there is there a way to make Beam skip certain failures -
for example I am using BigQueryIO to write to a table, where the table name
is chosen dynamically:


```

.apply(BigQueryIO.<TableRow>write()

.to(new ExtractTableName()))

```


I want to make it so that, if for some reason my ExtractTableName instance
(which is a SerializableFunction<ValueInSingleWindow<TableRow>,
TableDestination>) throws an exception, then the exception is logged and
the write is skipped.


Is it possible to achieve this behaviour without modifying the Beam
codebase/BigQueryIO retry logic?

At the moment if my function throws an exception, the write is retried
indefinitely.


Thanks,

Josh

Re: How to skip processing when specific exception is thrown?

Posted by Josh <jo...@gmail.com>.
Hi Dan,

Ok I see, that makes sense. I thought it might make things easier if there
was a way to define a strategy for handling certain exceptions (e.g. where
the strategy could be to skip processing the record). But I understand the
worry about making data loss easy. I could probably refactor my pipeline
and move the exception-throwing code from the tablespec function to a DoFn,
which only outputs an element if the table name is found successfully, and
then make the tablespec function something very simple. But I will hack it
like you described for now!

Thanks,
Josh

On Tue, May 16, 2017 at 2:06 PM, Dan Halperin <dh...@google.com> wrote:

> Hey Josh,
>
> There isn't really generic functionality for this as we don't want to make
> "data loss" easy. There are some ongoing designs for specific transforms
> (e.g., BEAM-190 for BigQueryIO). One easy thing to do in this case might be
> to wrap the code in a try/catch and if you catch an exception then return
> some table name like "leftovers".
>
> Dan
>
> On Tue, May 16, 2017 at 8:02 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am wondering if there is there a way to make Beam skip certain failures
>> - for example I am using BigQueryIO to write to a table, where the table
>> name is chosen dynamically:
>>
>>
>> ```
>>
>> .apply(BigQueryIO.<TableRow>write()
>>
>> .to(new ExtractTableName()))
>>
>> ```
>>
>>
>> I want to make it so that, if for some reason my ExtractTableName
>> instance (which is a SerializableFunction<ValueInSingleWindow<TableRow>,
>> TableDestination>) throws an exception, then the exception is logged and
>> the write is skipped.
>>
>>
>> Is it possible to achieve this behaviour without modifying the Beam
>> codebase/BigQueryIO retry logic?
>>
>> At the moment if my function throws an exception, the write is retried
>> indefinitely.
>>
>>
>> Thanks,
>>
>> Josh
>>
>
>

Re: How to skip processing when specific exception is thrown?

Posted by Dan Halperin <dh...@google.com>.
Hey Josh,

There isn't really generic functionality for this as we don't want to make
"data loss" easy. There are some ongoing designs for specific transforms
(e.g., BEAM-190 for BigQueryIO). One easy thing to do in this case might be
to wrap the code in a try/catch and if you catch an exception then return
some table name like "leftovers".

Dan

On Tue, May 16, 2017 at 8:02 AM, Josh <jo...@gmail.com> wrote:

> Hi all,
>
> I am wondering if there is there a way to make Beam skip certain failures
> - for example I am using BigQueryIO to write to a table, where the table
> name is chosen dynamically:
>
>
> ```
>
> .apply(BigQueryIO.<TableRow>write()
>
> .to(new ExtractTableName()))
>
> ```
>
>
> I want to make it so that, if for some reason my ExtractTableName instance
> (which is a SerializableFunction<ValueInSingleWindow<TableRow>,
> TableDestination>) throws an exception, then the exception is logged and
> the write is skipped.
>
>
> Is it possible to achieve this behaviour without modifying the Beam
> codebase/BigQueryIO retry logic?
>
> At the moment if my function throws an exception, the write is retried
> indefinitely.
>
>
> Thanks,
>
> Josh
>