You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/02 11:34:43 UTC

[flink] branch release-1.6 updated: [FLINK-11235][es] Close transportclient if no connection could be established

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 1702c99  [FLINK-11235][es] Close transportclient if no connection could be established
1702c99 is described below

commit 1702c9902c9e03e98473035bf82f4b07fffa3bb6
Author: 谢磊 <22...@qq.com>
AuthorDate: Wed Jan 2 19:31:59 2019 +0800

    [FLINK-11235][es] Close transportclient if no connection could be established
---
 .../connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java       | 5 +++++
 .../connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java       | 5 +++++
 2 files changed, 10 insertions(+)

diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 73a69eb..34f2ad3 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.elasticsearch.action.bulk.BackoffPolicy;
@@ -72,6 +73,10 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<T
 
 		// verify that we actually are connected to a cluster
 		if (transportClient.connectedNodes().isEmpty()) {
+
+			// close the transportClient here
+			IOUtils.closeQuietly(transportClient);
+
 			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index a3453ec..d59b8e9 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.elasticsearch.action.bulk.BackoffPolicy;
@@ -78,6 +79,10 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<T
 
 		// verify that we actually are connected to a cluster
 		if (transportClient.connectedNodes().isEmpty()) {
+
+			// close the transportClient here
+			IOUtils.closeQuietly(transportClient);
+
 			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
 		}