You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/10/10 16:36:19 UTC

[flink] branch release-1.8 updated: [FLINK-14334][es][docs] Use ExceptionUtils#findThrowable

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 0281b73  [FLINK-14334][es][docs] Use ExceptionUtils#findThrowable
0281b73 is described below

commit 0281b73d9b50759def94d0350bf2ded7b910e0e3
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Oct 7 10:40:58 2019 +0200

    [FLINK-14334][es][docs] Use ExceptionUtils#findThrowable
    
    This closes #9849.
---
 docs/dev/connectors/elasticsearch.md                              | 8 ++++----
 .../streaming/connectors/cassandra/CassandraFailureHandler.java   | 2 +-
 .../connectors/elasticsearch/ActionRequestFailureHandler.java     | 4 ++--
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index d0a159b..f60530e 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -526,10 +526,10 @@ input.addSink(new ElasticsearchSink<>(
                 int restStatusCode,
                 RequestIndexer indexer) throw Throwable {
 
-            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+            if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                 // full queue; re-add document for indexing
                 indexer.add(action);
-            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+            } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                 // malformed document; simply drop request without failing sink
             } else {
                 // for all other failures, fail the sink
@@ -554,10 +554,10 @@ input.addSink(new ElasticsearchSink(
                 int restStatusCode,
                 RequestIndexer indexer) {
 
-            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+            if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                 // full queue; re-add document for indexing
                 indexer.add(action)
-            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+            } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                 // malformed document; simply drop request without failing sink
             } else {
                 // for all other failures, fail the sink
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
index 478ba47..8bcb0ca 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
  *
  * 		@Override
  * 		void onFailure(Throwable failure) throws IOException {
- * 			if (ExceptionUtils.containsThrowable(failure, WriteTimeoutException.class)) {
+ * 			if (ExceptionUtils.findThrowable(failure, WriteTimeoutException.class).isPresent()) {
  * 				// drop exception
  * 			} else {
  * 				// for all other failures, fail the sink;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index 260f80e..1514aba 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -36,10 +36,10 @@ import java.io.Serializable;
  *
  *		@Override
  *		void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
- *			if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ *			if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
  *				// full queue; re-add document for indexing
  *				indexer.add(action);
- *			} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
+ *			} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
  *				// malformed document; simply drop request without failing sink
  *			} else {
  *				// for all other failures, fail the sink;