You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/30 17:54:13 UTC

[flink] branch master updated: [FLINK-25961] [connectors/elasticsearch] Remove transport client from Elasticsearch 6/7 connectors (tests only)

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 107f17b  [FLINK-25961] [connectors/elasticsearch] Remove transport client from Elasticsearch 6/7 connectors (tests only)
107f17b is described below

commit 107f17b0427bae96a3cc18b77191421cd09888b2
Author: Andriy Redko <an...@aiven.io>
AuthorDate: Fri Feb 4 10:44:57 2022 -0500

    [FLINK-25961] [connectors/elasticsearch] Remove transport client from Elasticsearch 6/7 connectors (tests only)
    
    Signed-off-by: Andriy Redko <an...@aiven.io>
---
 .../flink-connector-elasticsearch-base/pom.xml     |  8 ----
 .../elasticsearch/ElasticsearchSinkTestBase.java   |  8 ++--
 .../testutils/SourceSinkDataTestKit.java           | 10 +++--
 .../flink-connector-elasticsearch6/pom.xml         | 18 ---------
 .../table/Elasticsearch6DynamicSinkITCase.java     | 43 +++++++++++-----------
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 15 ++------
 .../flink-connector-elasticsearch7/pom.xml         | 18 ---------
 .../table/Elasticsearch7DynamicSinkITCase.java     | 37 ++++++++++---------
 .../elasticsearch7/ElasticsearchSinkITCase.java    | 15 ++------
 9 files changed, 59 insertions(+), 113 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index a741766..89ae167 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -96,14 +96,6 @@ under the License.
 
 		<!-- Tests -->
 
-		<!-- For PrebuiltTransportClient use in test -->
-		<dependency>
-			<groupId>org.elasticsearch.client</groupId>
-			<artifactId>transport</artifactId>
-			<version>${elasticsearch.version}</version>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>elasticsearch</artifactId>
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 1d250b7..84342fc 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 import org.apache.flink.test.util.AbstractTestBase;
 
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,9 +44,7 @@ import static org.junit.Assert.fail;
 public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A>
         extends AbstractTestBase {
 
-    // It's not good that we're using a Client here instead of a Rest Client but we need this
-    // for compatibility with ES 5.3.x. As soon as we drop that we can use RestClient here.
-    protected abstract Client getClient();
+    protected abstract RestHighLevelClient getClient();
 
     protected abstract String getClusterName();
 
@@ -90,7 +88,7 @@ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A>
         env.execute("Elasticsearch Sink Test");
 
         // verify the results
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
 
         SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
index 41e199d..584ed4d 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
@@ -26,7 +26,8 @@ import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.junit.Assert;
@@ -136,10 +137,13 @@ public class SourceSinkDataTestKit {
      * @param client The client to use to connect to Elasticsearch
      * @param index The index to check
      */
-    public static void verifyProducedSinkData(Client client, String index) {
+    public static void verifyProducedSinkData(RestHighLevelClient client, String index)
+            throws IOException {
         for (int i = 0; i < NUM_ELEMENTS; i++) {
             GetResponse response =
-                    client.get(new GetRequest(index, TYPE_NAME, Integer.toString(i))).actionGet();
+                    client.get(
+                            new GetRequest(index, TYPE_NAME, Integer.toString(i)),
+                            RequestOptions.DEFAULT);
             Assert.assertEquals(DATA_PREFIX + i, response.getSource().get(DATA_FIELD_NAME));
         }
     }
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index 3af8f66..ad428de 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -139,24 +139,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<!--
-   			 Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
-		-->
-
-		<dependency>
-			<groupId>org.elasticsearch.client</groupId>
-			<artifactId>transport</artifactId>
-			<version>${elasticsearch.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.elasticsearch.plugin</groupId>
-			<artifactId>transport-netty4-client</artifactId>
-			<version>${elasticsearch.version}</version>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-api</artifactId>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 0ebc52b..6717e8f 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -40,12 +40,13 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.http.HttpHost;
 import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
@@ -74,12 +75,9 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger {
             new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6));
 
     @SuppressWarnings("deprecation")
-    protected final Client getClient() {
-        TransportAddress transportAddress =
-                new TransportAddress(elasticsearchContainer.getTcpHost());
-        String expectedClusterName = "docker-cluster";
-        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
-        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    protected final RestHighLevelClient getClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress())));
     }
 
     @Test
@@ -147,10 +145,11 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger {
         environment.<RowData>fromElements(rowData).addSink(sinkFunction);
         environment.execute();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
-                        .actionGet()
+                client.get(
+                                new GetRequest(index, myType, "1_2012-12-12T12:12:12"),
+                                RequestOptions.DEFAULT)
                         .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
@@ -213,10 +212,11 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
-                        .actionGet()
+                client.get(
+                                new GetRequest(index, myType, "1_2012-12-12T12:12:12"),
+                                RequestOptions.DEFAULT)
                         .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
@@ -285,14 +285,14 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
 
         // search API does not return documents that were not indexed, we might need to query
         // the index a few times
         Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
         SearchHits hits;
         do {
-            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            hits = client.search(new SearchRequest(index), RequestOptions.DEFAULT).getHits();
             if (hits.getTotalHits() < 2) {
                 Thread.sleep(200);
             }
@@ -363,10 +363,11 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1"))
-                        .actionGet()
+                client.get(
+                                new GetRequest("dynamic-index-2012-12-12", myType, "1"),
+                                RequestOptions.DEFAULT)
                         .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
index b29e958..c2fd9f8 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -26,11 +26,8 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTest
 import org.apache.flink.util.DockerImageVersions;
 
 import org.apache.http.HttpHost;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -57,13 +54,9 @@ public class ElasticsearchSinkITCase
     }
 
     @Override
-    @SuppressWarnings("deprecation")
-    protected final Client getClient() {
-        TransportAddress transportAddress =
-                new TransportAddress(elasticsearchContainer.getTcpHost());
-        String expectedClusterName = getClusterName();
-        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
-        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    protected final RestHighLevelClient getClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress())));
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml b/flink-connectors/flink-connector-elasticsearch7/pom.xml
index 3c4bb4f..67348d4 100644
--- a/flink-connectors/flink-connector-elasticsearch7/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml
@@ -142,24 +142,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<!--
-   			 Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 7.x
-		-->
-
-		<dependency>
-			<groupId>org.elasticsearch.client</groupId>
-			<artifactId>transport</artifactId>
-			<version>${elasticsearch.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.elasticsearch.plugin</groupId>
-			<artifactId>transport-netty4-client</artifactId>
-			<version>${elasticsearch.version}</version>
-			<scope>test</scope>
-		</dependency>
-
 		<!-- Table API integration tests -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index fdf7a3a..3534f4a 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -40,12 +40,13 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.http.HttpHost;
 import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
@@ -74,12 +75,9 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger {
             new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7));
 
     @SuppressWarnings("deprecation")
-    protected final Client getClient() {
-        TransportAddress transportAddress =
-                new TransportAddress(elasticsearchContainer.getTcpHost());
-        String expectedClusterName = "docker-cluster";
-        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
-        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    protected final RestHighLevelClient getClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress())));
     }
 
     @Test
@@ -142,9 +140,10 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger {
         environment.<RowData>fromElements(rowData).addSink(sinkFunction);
         environment.execute();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12"), RequestOptions.DEFAULT)
+                        .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
         expectedMap.put("b", "00:00:12");
@@ -202,9 +201,10 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12"), RequestOptions.DEFAULT)
+                        .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
         expectedMap.put("b", "00:00:12");
@@ -268,14 +268,14 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
 
         // search API does not return documents that were not indexed, we might need to query
         // the index a few times
         Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
         SearchHits hits;
         do {
-            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            hits = client.search(new SearchRequest(index), RequestOptions.DEFAULT).getHits();
             if (hits.getTotalHits().value < 2) {
                 Thread.sleep(200);
             }
@@ -342,9 +342,10 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger {
                 .executeInsert("esTable")
                 .await();
 
-        Client client = getClient();
+        RestHighLevelClient client = getClient();
         Map<String, Object> response =
-                client.get(new GetRequest("dynamic-index-2012-12-12", "1")).actionGet().getSource();
+                client.get(new GetRequest("dynamic-index-2012-12-12", "1"), RequestOptions.DEFAULT)
+                        .getSource();
         Map<Object, Object> expectedMap = new HashMap<>();
         expectedMap.put("a", 1);
         expectedMap.put("b", "2012-12-12 12:12:12");
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java
index e5f89bd..e5a5527 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java
@@ -26,11 +26,8 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTest
 import org.apache.flink.util.DockerImageVersions;
 
 import org.apache.http.HttpHost;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -57,13 +54,9 @@ public class ElasticsearchSinkITCase
     }
 
     @Override
-    @SuppressWarnings("deprecation")
-    protected final Client getClient() {
-        TransportAddress transportAddress =
-                new TransportAddress(elasticsearchContainer.getTcpHost());
-        String expectedClusterName = getClusterName();
-        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
-        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    protected final RestHighLevelClient getClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress())));
     }
 
     @Test