You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/10/10 16:36:19 UTC
[flink] branch release-1.8 updated: [FLINK-14334][es][docs] Use
ExceptionUtils#findThrowable
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 0281b73 [FLINK-14334][es][docs] Use ExceptionUtils#findThrowable
0281b73 is described below
commit 0281b73d9b50759def94d0350bf2ded7b910e0e3
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Oct 7 10:40:58 2019 +0200
[FLINK-14334][es][docs] Use ExceptionUtils#findThrowable
This closes #9849.
---
docs/dev/connectors/elasticsearch.md | 8 ++++----
.../streaming/connectors/cassandra/CassandraFailureHandler.java | 2 +-
.../connectors/elasticsearch/ActionRequestFailureHandler.java | 4 ++--
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index d0a159b..f60530e 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -526,10 +526,10 @@ input.addSink(new ElasticsearchSink<>(
int restStatusCode,
RequestIndexer indexer) throw Throwable {
- if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
- } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+ } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
@@ -554,10 +554,10 @@ input.addSink(new ElasticsearchSink(
int restStatusCode,
RequestIndexer indexer) {
- if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action)
- } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+ } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
index 478ba47..8bcb0ca 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
*
* @Override
* void onFailure(Throwable failure) throws IOException {
- * if (ExceptionUtils.containsThrowable(failure, WriteTimeoutException.class)) {
+ * if (ExceptionUtils.findThrowable(failure, WriteTimeoutException.class).isPresent()) {
* // drop exception
* } else {
* // for all other failures, fail the sink;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index 260f80e..1514aba 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -36,10 +36,10 @@ import java.io.Serializable;
*
* @Override
* void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
- * if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ * if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
* // full queue; re-add document for indexing
* indexer.add(action);
- * } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+ * } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
* // malformed document; simply drop request without failing sink
* } else {
* // for all other failures, fail the sink;