You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2019/10/02 13:48:09 UTC
containThrowable missing in ExceptionUtils
Hi,
I’m trying to implement a failure handler for ElasticSearch from the example in the Flink documentation
DataStream<String> input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using findThrowable ?
TIA
Nick
Re: containThrowable missing in ExceptionUtils
Posted by Chesnay Schepler <ch...@apache.org>.
The listed method no longer exists and was subsumed by
ExceptionUtils#findThrowable, which also gives access to the Throwable
if it could be found.
I have filed FLINK-14334 for updating the documentation.
On 02/10/2019 15:48, Nicholas Walton wrote:
> Hi,
>
> I’m trying to implement a failure handler for ElasticSearch from the
> example in the Flink documentation
>
> |DataStream<String> input = ...; input.addSink(new
> ElasticsearchSink<>( config, transportAddresses, new
> ElasticsearchSinkFunction<String>() {...}, new
> ActionRequestFailureHandler() { @Override void onFailure(ActionRequest
> action, Throwable failure, int restStatusCode, RequestIndexer indexer)
> throw Throwable { if (ExceptionUtils.containsThrowable(failure,
> EsRejectedExecutionException.class)) { // full queue; re-add document
> for indexing indexer.add(action); } else if
> (ExceptionUtils.containsThrowable(failure,
> ElasticsearchParseException.class)) { // malformed document; simply
> drop request without failing sink } else { // for all other failures,
> fail the sink // here the failure is simply rethrown, but users can
> also choose to throw custom exceptions throw failure; } } }));|
>
> However, I can only find ExceptionUtils.containsThrowable in Flink
> 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can
> I implement it using findThrowable ?
> TIA
> Nick