You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/02 11:32:06 UTC
[flink] branch master updated: [FLINK-11235][es] Close
transportclient if no connection could be established
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4febcdc [FLINK-11235][es] Close transportclient if no connection could be established
4febcdc is described below
commit 4febcdc289bc0dce1090d831c7f99848a26f6465
Author: 谢磊 <22...@qq.com>
AuthorDate: Wed Jan 2 19:31:59 2019 +0800
[FLINK-11235][es] Close transportclient if no connection could be established
---
.../connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java | 5 +++++
.../connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java | 5 +++++
2 files changed, 10 insertions(+)
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 73a69eb..34f2ad3 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BackoffPolicy;
@@ -72,6 +73,10 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<T
// verify that we actually are connected to a cluster
if (transportClient.connectedNodes().isEmpty()) {
+
+ // close the transportClient here
+ IOUtils.closeQuietly(transportClient);
+
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
}
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index a3453ec..d59b8e9 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BackoffPolicy;
@@ -78,6 +79,10 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<T
// verify that we actually are connected to a cluster
if (transportClient.connectedNodes().isEmpty()) {
+
+ // close the transportClient here
+ IOUtils.closeQuietly(transportClient);
+
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
}