You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/12/28 14:40:00 UTC
[jira] [Created] (FLINK-30526) Handle failures in OpenSearch with ActionRequestFailureHandler being deprecated
Martijn Visser created FLINK-30526:
--------------------------------------
Summary: Handle failures in OpenSearch with ActionRequestFailureHandler being deprecated
Key: FLINK-30526
URL: https://issues.apache.org/jira/browse/FLINK-30526
Project: Flink
Issue Type: Bug
Components: Connectors / Opensearch
Reporter: Martijn Visser
{quote} Hi everyone,
I have a streaming application that has Elasticsearch sink.
I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help me.
In the previous version I created ElasticsearchSink and added a failure handler, which protected the sink to not fail on some exceptions.
final ActionRequestFailureHandler failureHandler = (action, failure, restStatusCode, indexer) -> {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
log.warn("Got malformed document , action {}", action);
// malformed document; simply drop elasticsearchSinkFunction without failing sink
} else if (failure instanceof IOException && failure.getCause() instanceof NullPointerException && failure.getMessage().contains("Unable to parse response body")) {
//issue with ES 7 and opensearch - that does not send type - while response is waiting for it
//at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127) -- this.type = Objects.requireNonNull(type);
log.debug("known issue format the response for ES 7.5.1 and DB OS (opensearch) :{}", failure.getMessage());
} else {
// for all other failures, log and don't fail the sink
log.error("Got error while trying to perform ES action {}", action, failure);
}
};
final ElasticsearchSink.Builder<T> builder = new ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
In the new version the class ActionRequestFailureHandler is deprecated and after investigation I can't find any way to handle failures.
For all failures the sink fails.
Is there anything I didn't see?
Thanks is advance!
{quote}
From the Apache Flink Slack channel https://apache-flink.slack.com/archives/C03G7LJTS2G/p1672122873318899
--
This message was sent by Atlassian Jira
(v8.20.10#820010)