You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2019/10/02 13:48:09 UTC

containThrowable missing in ExceptionUtils

Hi,

I’m trying to implement a failure handler for ElasticSearch from the example in the Flink documentation

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));

However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using findThrowable ?

TIA

Nick

Re: containThrowable missing in ExceptionUtils

Posted by Chesnay Schepler <ch...@apache.org>.
The listed method no longer exists and was subsumed by 
ExceptionUtils#findThrowable, which also gives access to the Throwable 
if it could be found.

I have filed FLINK-14334 for updating the documentation.

On 02/10/2019 15:48, Nicholas Walton wrote:
> Hi,
>
> I’m trying to implement a failure handler for ElasticSearch from the 
> example in the Flink documentation
>
> |DataStream<String> input = ...; input.addSink(new 
> ElasticsearchSink<>( config, transportAddresses, new 
> ElasticsearchSinkFunction<String>() {...}, new 
> ActionRequestFailureHandler() { @Override void onFailure(ActionRequest 
> action, Throwable failure, int restStatusCode, RequestIndexer indexer) 
> throw Throwable { if (ExceptionUtils.containsThrowable(failure, 
> EsRejectedExecutionException.class)) { // full queue; re-add document 
> for indexing indexer.add(action); } else if 
> (ExceptionUtils.containsThrowable(failure, 
> ElasticsearchParseException.class)) { // malformed document; simply 
> drop request without failing sink } else { // for all other failures, 
> fail the sink // here the failure is simply rethrown, but users can 
> also choose to throw custom exceptions throw failure; } } }));|
>
> However, I can only find ExceptionUtils.containsThrowable in Flink 
> 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can 
> I implement it using findThrowable ?
> TIA
> Nick