You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/08 09:25:35 UTC

[GitHub] [flink] afedulov opened a new pull request, #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

afedulov opened a new pull request, #19405:
URL: https://github.com/apache/flink/pull/19405

   This PR introduces Java-based E2E tests for the Elasticsearch connectors based on the Sink testing framework and removes the legacy bash-based tests [TBD].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853933757


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   Whoops, this was not supposed to be merged. The way the test is designed the exception is swallowed by default, this is a remainder of my debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853980302


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   Same story https://github.com/apache/flink/pull/19405#discussion_r853980008



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   Same story as with https://github.com/apache/flink/pull/19405#discussion_r853980008



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853933757


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   This was not going to make it into the final commit, I left it for the still open discussion of tests stability without "sleep" in the reader. The exception is simply swallowed in the base tests.
   



##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   This was not going to make it into the final commit, I left it for the still open discussion of tests stability without "sleep" in the reader. The exception is simply swallowed in the base tests. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1114720108

   Thanks, @fapaul, I squashed the review commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r848940015


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Override
+    public Sink<TupleC2<String, String>> createSink(TestingSinkSettings sinkSettings) {
+        return new Elasticsearch6SinkBuilder<TupleC2<String, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new Elasticsearch6TestEmitter(indexName))
+                .setBulkFlushMaxActions(100) // emit after every element, don't buffer
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<TupleC2<String, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        Elasticsearch6Utils.refreshIndex(client, indexName);
+        return new Elasticsearch6DataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public List<TupleC2<String, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<TupleC2<String, String>> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        for (int i = 0; i < recordNum; i++) {
+            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            String key = Integer.toString(i);
+            String value = RandomStringUtils.random(valueLength, true, true);
+            randomStringRecords.add(TupleC2.of(key, value));
+        }
+        return randomStringRecords;

Review Comment:
   Copy-paste from Kafka tests. Migrated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r856141131


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -30,10 +30,14 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-elasticsearch7-test</artifactId>
-	<name>Flink : E2E Tests : Elasticsearch 7</name>
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>

Review Comment:
   Is `Java` really important here?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/pom.xml:
##########
@@ -43,50 +43,52 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<artifactId>flink-connector-test-utils</artifactId>
 			<version>${project.version}</version>
+			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${testcontainers.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope></dependency>
 	</dependencies>
 
+	<dependencyManagement>
+	   <dependencies>
+		  <dependency>
+			 <groupId>org.apache.httpcomponents</groupId>
+			 <artifactId>httpcore-nio</artifactId>
+			 <version>4.4.12</version>
+		  </dependency>
+	   </dependencies>
+	</dependencyManagement>
+
 	<build>
 		<plugins>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
+				<artifactId>maven-jar-plugin</artifactId>
 				<executions>
 					<execution>
-						<id>Elasticsearch6SinkExample</id>
+						<id>Jar Package</id>
 						<phase>package</phase>
 						<goals>
-							<goal>shade</goal>
+							<goal>test-jar</goal>

Review Comment:
   Can you avoid introducing more test jars and maybe move the common utilities to the main package?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/test/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */
+@SuppressWarnings("unused")
+public abstract class ElasticsearchSinkE2ECaseBase<T extends Comparable<T>>
+        extends SinkTestSuiteBase<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkE2ECaseBase.class);
+    private static final int READER_RETRY_ATTEMPTS = 10;
+    private static final int READER_TIMEOUT = 10;
+
+    protected static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv
+    protected FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(getElasticsearchContainerName()))
+                                    .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    @Override
+    protected void checkResultWithSemantic(

Review Comment:
   Can you explain this in more detail? Isn't this only checking that any record was written to elasticsearch?



##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -540,7 +540,7 @@ private boolean compareSinkMetrics(
     }
 
     /** Sort the list. */
-    private List<T> sort(List<T> list) {
+    protected List<T> sort(List<T> list) {

Review Comment:
   Not sure it makes much sense to make this method visible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853980008


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/pom.xml:
##########
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch6</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 6 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   This is the equivalent of building a fat jar for the user program. If we where to use hte `flink-connector-elasticsearchX` dependency here instead https://github.com/apache/flink/blob/4015a3a7ae3f6cd6d05def3ac06f8b8dccbf31de/flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java#L64-L71
   , we'd also have to add every library that ES connector depends on as a separate JAR (and there are a LOT of dependencies).
   



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/pom.xml:
##########
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch6</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 6 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   This is the equivalent of building a fat jar for the user program. If we were to use the `flink-connector-elasticsearchX` dependency here instead https://github.com/apache/flink/blob/4015a3a7ae3f6cd6d05def3ac06f8b8dccbf31de/flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java#L64-L71
   , we'd also have to add every library that ES connector depends on as a separate JAR (and there are a LOT of dependencies).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853933757


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   Whoops, this was not supposed to be merged. The way the test is designed the exception is swallowed by default, this is an accidental leftover of debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853895298


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   Do you need the `compareTo`? If you only need the comparison in a single place, it might be easier to use the existing `Tuple2` class and just pass in the respective comparator at the place where you want to compare things.



##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   Please use a more descriptive error message.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/pom.xml:
##########
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch6</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 6 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   Quick reminder why do we need shading here?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        // TODO: Tests are flaky without this small delay.

Review Comment:
   I thought we had fixed this problem. Can you explain what is going on here?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   Add a log statement to increase the awareness of the error.



##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -500,6 +500,7 @@ private void checkResultWithSemantic(
             ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
             throws Exception {
         final ArrayList<T> result = new ArrayList<>();
+        int failed = 0;

Review Comment:
   Is this variable unused?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-elasticsearch-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>

Review Comment:
   Same question about the shading



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   Again add log



##########
flink-end-to-end-tests/run-nightly-tests.sh:
##########
@@ -199,9 +199,6 @@ function run_group_2 {
 
     run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
 
-    run_test "Elasticsearch (v6.8.20) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.20.tar.gz"

Review Comment:
   It would be great to remove the bash e2e tests on a separate commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul merged pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul merged PR #19405:
URL: https://github.com/apache/flink/pull/19405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1092655648

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4837f326cdab8b6c4844c7943c7365416273e589",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4837f326cdab8b6c4844c7943c7365416273e589",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4837f326cdab8b6c4844c7943c7365416273e589 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r861131896


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   Thanks for the clarification. Please let me know if the changes from the latest commit are aligned with your idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1105509822

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853967062


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   It is not really an exception, the sink test framework is designed to pull data until it is there.
   https://github.com/apache/flink/blob/91c2b8d56bae03c2ce4a50b5c014cea842df9f74/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java#L460
   
   This is how it is also handled in the Kafka Sink tests:
   https://github.com/apache/flink/blob/57e3f03ccd719ed772c983ba335517d95f8f3e6a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java#L53-L55



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1103880587

   Are the `Elasticsearch6SinkExample` and `Elasticsearch7SinkExample` still used after removing the bash tests? AFAIK the test suites do not use it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r848932798


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase extends SinkTestSuiteBase<TupleC2<String, String>> {
+    private static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(
+                                                    DockerImageVersions.ELASTICSEARCH_7))
+                                    .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    @TestContext
+    Elasticsearch6SinkExternalContextFactory contextFactory =
+            new Elasticsearch6SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("elasticsearch6-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing-elasticsearch.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing.jar")
+                                    .toAbsolutePath()
+                                    .toUri()

Review Comment:
   Cleaned up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853967062


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   It is not really an exception, the sink test framework is designed to pull data until it is there.
   https://github.com/apache/flink/blob/91c2b8d56bae03c2ce4a50b5c014cea842df9f74/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java#L460
   
   This is how it is also handled in the Kafka Sink tests:
   https://github.com/apache/flink/blob/57e3f03ccd719ed772c983ba335517d95f8f3e6a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java#L54



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853933757


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   This is not going to make it into the final commit, I left it for the still open discussion of tests stability without "sleep" in the reader. The exception is simply swallowed in the base tests.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1103777876

   @fapaul I addressed your comments, please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r860639392


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   Maybe I missed your change but `emit` looks similar to before. My idea is basically to only have one `ElasticsearchTestEmitter` implementing the emitter interface, and this class is used by elasticsearch6 and elasticsearch7. Both modules pass the needed information to construct the request in the constructor of the `ElasticsearchTestEmitter`. There also are still conflicts with the master branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1106341694

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853934778


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        // TODO: Tests are flaky without this small delay.

Review Comment:
   https://github.com/apache/flink/pull/19405#discussion_r848945151



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853968153


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -500,6 +500,7 @@ private void checkResultWithSemantic(
             ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
             throws Exception {
         final ArrayList<T> result = new ArrayList<>();
+        int failed = 0;

Review Comment:
   Leftover from debugging. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854077355


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        // TODO: Tests are flaky without this small delay.

Review Comment:
   You need to fix this because the current timeout can easily become flaky in the future did you try setting the refresh policy [1] in the emitters that every write needs to wait until the index is refreshed? 
   
   [1] https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html#_optional_arguments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854069305


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   Ah, I see thanks for the hint. WDYT about adding the `compareTo` method to `Tuple2`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r861131896


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   Thanks for the clarification. Please let me know if the changes from the topmost commit are aligned with your idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1097368326

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853968674


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   Please notice that without this log, the tests will simply accidentally hang if you were to run them without "sleep" in the reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854078618


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   I see it also as a problem in the KafkaDataReader swallowing an exception is never a good idea. Adding a simple log statement like `Fetching records failed`, e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1103958691

   > Did you address this?
   Pardon, missed the top review comment.  Fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854259322


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   Let's keep like it currently is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r860177880


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   Fixed `emit` and `createDoc`, but I do not quite get which composition structure you propose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r848946159


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<!--		<dependency>

Review Comment:
   I restructured the modules structure in the new version and cleaned up the poms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1103881566

   > I think we should be consistent in naming the common modules. The Kafka module is called `[...]common-kafka` and yours is called `[...]elasticsearch-common`.
   
   Did you address this?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854197382


##########
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java:
##########
@@ -508,6 +509,7 @@ private void checkResultWithSemantic(
                                 .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
                         return true;
                     } catch (Throwable t) {
+                        LOG.error("Ooops", t);

Review Comment:
   Removed from PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853936471


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   This is backed into the testing framework:
   https://github.com/apache/flink/blob/91c2b8d56bae03c2ce4a50b5c014cea842df9f74/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java#L518



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r846017025


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/pom.xml:
##########
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch6</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 6 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>1.16-SNAPSHOT</version>

Review Comment:
   I guess all `1.16-SNAPTHOT` should be `${project.version}`



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/com/apache/flink/streaming/tests/TupleC2.java:
##########
@@ -0,0 +1,30 @@
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** A {@link Comparable} implementation of {@link Tuple2}. */
+public class TupleC2<T0 extends Comparable<? super T0>, T1 extends Comparable<? super T1>>

Review Comment:
   I would much rather have a dedicated class for the test results than introduce a new `Tuple` class.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/pom.xml:
##########
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch6</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 6 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common-elasticsearch</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>Elasticsearch6SinkExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>elasticsearch6-end-to-end-test</finalName>
+							<outputDirectory>dependencies</outputDirectory>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>
+										org.apache.flink.streaming.tests.Elasticsearch6SinkExample

Review Comment:
   Is the `Elasticsearch6SinkExample` used anywhere still? If not we can remove most of the complexity of the pom.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6DataReader.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Elasticsearch data reader. */
+public class Elasticsearch6DataReader implements ExternalSystemDataReader<TupleC2<String, String>> {
+    private final RestHighLevelClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from = 0;
+
+    public Elasticsearch6DataReader(RestHighLevelClient client, String indexName, int pageLength) {
+        this.client = client;
+        this.indexName = indexName;
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<TupleC2<String, String>> poll(Duration timeout) {
+        List<TupleC2<String, String>> result = new ArrayList<>();
+        try {
+            SearchResponse response =
+                    client.search(
+                            new SearchRequest(indexName)
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .query(QueryBuilders.matchAllQuery())
+                                                    .from(from)
+                                                    .size(pageLength)
+                                                    .trackTotalHits(true)),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            for (SearchHit searchHit : searchHits) {
+                TupleC2<String, String> hit =
+                        TupleC2.of(
+                                searchHit.getId(),
+                                searchHit.getSourceAsMap().get("data").toString());
+                result.add(hit);
+            }
+            from += searchHits.length;
+            return result;
+        } catch (IOException e) {
+            return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {

Review Comment:
   Can the client be null?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase extends SinkTestSuiteBase<TupleC2<String, String>> {
+    private static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(
+                                                    DockerImageVersions.ELASTICSEARCH_7))

Review Comment:
   ```suggestion
                                                       DockerImageVersions.ELASTICSEARCH_6))
   ```



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6DataReader.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Elasticsearch data reader. */
+public class Elasticsearch6DataReader implements ExternalSystemDataReader<TupleC2<String, String>> {
+    private final RestHighLevelClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from = 0;
+
+    public Elasticsearch6DataReader(RestHighLevelClient client, String indexName, int pageLength) {
+        this.client = client;
+        this.indexName = indexName;
+        this.pageLength = pageLength;

Review Comment:
   please add `checkNotNull` for the non nullable parameters.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Override
+    public Sink<TupleC2<String, String>> createSink(TestingSinkSettings sinkSettings) {
+        return new Elasticsearch6SinkBuilder<TupleC2<String, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new Elasticsearch6TestEmitter(indexName))
+                .setBulkFlushMaxActions(100) // emit after every element, don't buffer
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<TupleC2<String, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        Elasticsearch6Utils.refreshIndex(client, indexName);
+        return new Elasticsearch6DataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public List<TupleC2<String, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<TupleC2<String, String>> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        for (int i = 0; i < recordNum; i++) {
+            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            String key = Integer.toString(i);
+            String value = RandomStringUtils.random(valueLength, true, true);
+            randomStringRecords.add(TupleC2.of(key, value));
+        }
+        return randomStringRecords;

Review Comment:
   Nit: Again, please use the stream syntax.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Override
+    public Sink<TupleC2<String, String>> createSink(TestingSinkSettings sinkSettings) {
+        return new Elasticsearch6SinkBuilder<TupleC2<String, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new Elasticsearch6TestEmitter(indexName))
+                .setBulkFlushMaxActions(100) // emit after every element, don't buffer
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<TupleC2<String, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        Elasticsearch6Utils.refreshIndex(client, indexName);
+        return new Elasticsearch6DataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public List<TupleC2<String, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<TupleC2<String, String>> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        for (int i = 0; i < recordNum; i++) {
+            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            String key = Integer.toString(i);
+            String value = RandomStringUtils.random(valueLength, true, true);
+            randomStringRecords.add(TupleC2.of(key, value));
+        }
+        return randomStringRecords;
+    }
+
+    @Override
+    public void close() {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            client.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        Elasticsearch6Utils.refreshIndex(client, indexName);

Review Comment:
   Out of curiosity since every time you instantiate the DataReader the indices are refreshed can you give some more details about the race condition?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/pom.xml:
##########
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-elasticsearch7</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch 7 Java</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>1.16-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<!--		<dependency>

Review Comment:
   Why does this module not depend on elasticsearch common? 



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase extends SinkTestSuiteBase<TupleC2<String, String>> {
+    private static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(
+                                                    DockerImageVersions.ELASTICSEARCH_7))
+                                    .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    @TestContext
+    Elasticsearch6SinkExternalContextFactory contextFactory =
+            new Elasticsearch6SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("elasticsearch6-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing-elasticsearch.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing.jar")
+                                    .toAbsolutePath()
+                                    .toUri()

Review Comment:
   Can you explain why you need all three jars?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+public class Elasticsearch6SinkExternalContextFactory
+        implements ExternalContextFactory<Elasticsearch6SinkExternalContext> {
+
+    private final ElasticsearchContainer elasticsearchContainer;
+    private final List<URL> connectorJars;
+
+    Elasticsearch6SinkExternalContextFactory(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        this.elasticsearchContainer = elasticsearchContainer;
+        this.connectorJars = connectorJars;

Review Comment:
   `checkNotNull`



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6DataReader.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Elasticsearch data reader. */
+public class Elasticsearch6DataReader implements ExternalSystemDataReader<TupleC2<String, String>> {
+    private final RestHighLevelClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from = 0;
+
+    public Elasticsearch6DataReader(RestHighLevelClient client, String indexName, int pageLength) {
+        this.client = client;
+        this.indexName = indexName;
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<TupleC2<String, String>> poll(Duration timeout) {
+        List<TupleC2<String, String>> result = new ArrayList<>();
+        try {
+            SearchResponse response =
+                    client.search(
+                            new SearchRequest(indexName)
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .query(QueryBuilders.matchAllQuery())
+                                                    .from(from)
+                                                    .size(pageLength)
+                                                    .trackTotalHits(true)),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            for (SearchHit searchHit : searchHits) {
+                TupleC2<String, String> hit =
+                        TupleC2.of(
+                                searchHit.getId(),
+                                searchHit.getSourceAsMap().get("data").toString());
+                result.add(hit);

Review Comment:
   Nit: I think you can replace this whole block with java streams API.
   ```java
   from += searchHits.length;
   return Arrays.stream(searchHits())
       .map(...)
       .collect(...);
   ```



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;

Review Comment:
   `checkNotNull`



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Override
+    public Sink<TupleC2<String, String>> createSink(TestingSinkSettings sinkSettings) {
+        return new Elasticsearch6SinkBuilder<TupleC2<String, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new Elasticsearch6TestEmitter(indexName))
+                .setBulkFlushMaxActions(100) // emit after every element, don't buffer
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<TupleC2<String, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        Elasticsearch6Utils.refreshIndex(client, indexName);

Review Comment:
   Would it make sense to move the refresh method into `Elasticsearch6DataReader#poll` ?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6Utils.java:
##########
@@ -0,0 +1,43 @@
+package com.apache.flink.streaming.tests;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class Elasticsearch6Utils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Utils.class);
+
+    static void refreshIndex(RestHighLevelClient client, String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            client.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    static void flushIndex(RestHighLevelClient client, String indexName) {

Review Comment:
   This method seems to be unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r848945151


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/test/java/com/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+class Elasticsearch6SinkExternalContext
+        implements DataStreamSinkV2ExternalContext<TupleC2<String, String>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch6SinkExternalContext.class);
+
+    private static final String INDEX_NAME_PREFIX = "es-index";
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int PAGE_LENGTH = 100;
+
+    protected final String indexName;
+
+    private final String addressInternal;
+    private final List<URL> connectorJarPaths;
+    private final RestHighLevelClient client;
+
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        this.addressInternal = addressInternal;
+        this.connectorJarPaths = connectorJarPaths;
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.client = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Override
+    public Sink<TupleC2<String, String>> createSink(TestingSinkSettings sinkSettings) {
+        return new Elasticsearch6SinkBuilder<TupleC2<String, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new Elasticsearch6TestEmitter(indexName))
+                .setBulkFlushMaxActions(100) // emit after every element, don't buffer
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<TupleC2<String, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        Elasticsearch6Utils.refreshIndex(client, indexName);
+        return new Elasticsearch6DataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public List<TupleC2<String, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<TupleC2<String, String>> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        for (int i = 0; i < recordNum; i++) {
+            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            String key = Integer.toString(i);
+            String value = RandomStringUtils.random(valueLength, true, true);
+            randomStringRecords.add(TupleC2.of(key, value));
+        }
+        return randomStringRecords;
+    }
+
+    @Override
+    public void close() {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            client.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        Elasticsearch6Utils.refreshIndex(client, indexName);

Review Comment:
   The tests that are verifying recover/rescale split data into two chunks that are called in such a way that it first verifies whether the first part is successfully written and then, after the restart, the second. This happens on the same index.
   But in general, a refresh causes new entries to propagate into Lucene SST files and become available to serving. There is an inherent race condition because the reader starts as soon as the Flink's job has finished. If this happens faster than the data is "processed" on the ES side, the reader will miss it. Unfortunately, despite the active refresh, it seems that in order to make the tests stable it is still required to add a short sleep. This does not correspond to the "refresh" contract - I cannot explain why this is happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1097361070

   Thanks a lot for your review @fapaul. I originally did not want to invest much time into addressing code duplication because I was under the impression that the ES6-related tests are going to be removed soon after the connector externalization. However I still see your point in doing so. At least when ES8 will be introduced, we'll be able to make use of the proper setup.
   I reworked the structure and extracted everything up until the level when a lot of concrete Elasticsearch classes are used. Further deduplication between the `Elasticsearch6Client` and `Elasticsearch7Client` is potentially possible but would require reintroducing a facade for a huge chunk of low-level ES abstractions and it does not seem to be worth it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854203781


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+    private int from;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        // TODO: Tests are flaky without this small delay.

Review Comment:
   I will try, but I suspect that the issue manifests itself between the time when the writes are done from the Flink side and the test immediately starts the reader, which somehow still does not see the written data, even despite an active blocking index refresh. As the policy is related to writes, not reads, I do not expect it to change much. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r853968891


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            return Collections.emptyList();

Review Comment:
   Same as https://github.com/apache/flink/pull/19405#discussion_r853967062



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854161245


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   That could work but would make `Tuple2` somewhat special, not sure if we want this for consistency reasons. An alternative is to define `Tuple` as comparable, but that would mean adding `compareTo` to 73 classes, including test utilities across the framework. Having a separate `KeyValue` class seems like a good alternative to both of those approaches. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r854161245


##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {

Review Comment:
   That could work but would make `Tuple2` somewhat special, not sure if we want this for consistency reasons. An alternative is to define `Tuple` as comparable, but that would mean adding `compareTo` to 73 classes, including test utilities across the framework. Having a separate `KeyValue` class seemed like a good alternative to both of those approaches. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1103978867

   > Are the Elasticsearch6SinkExample and Elasticsearch7SinkExample still used after removing the bash tests? AFAIK the test suites do not use it.
   
   Good point. Cleaned up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r859457944


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/pom.xml:
##########
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.16-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-end-to-end-tests-common-elasticsearch</artifactId>
+	<name>Flink : E2E Tests : Elasticsearch Common</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${testcontainers.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+
+	<dependencyManagement>
+	   <dependencies>
+		  <dependency>
+			 <groupId>org.apache.httpcomponents</groupId>

Review Comment:
   Can you add a small comment why this is necessary?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   I think it would be good to revisit the current `TestEmitter` structure. You can replace the inheritance with composition by only implementing one class that has only a constructor parameter to build the `UpdateRequest`. Currently, `emit` is marked as default, although all implementors override it, and sharing createDoc via a default method also feels strange because it could be a static method.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);

Review Comment:
   Same as for the other client, please include the exception in the log message.



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context. */
+abstract class ElasticsearchSinkExternalContextBase
+        implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> {
+    /** The constant INDEX_NAME_PREFIX. */
+    protected static final String INDEX_NAME_PREFIX = "es-index";
+
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    protected static final int BULK_BUFFER = 100;
+    protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1;
+    /** The index name. */
+    protected final String indexName;
+
+    /** The address reachable from Flink (internal to the testing environment). */
+    protected final String addressInternal;
+
+    /** The connector jar paths. */
+    protected final List<URL> connectorJarPaths;
+
+    /** The client. */
+    protected final ElasticsearchClient client;
+
+    /**
+     * Instantiates a new Elasticsearch sink context base.
+     *
+     * @param addressInternal The address to access Elasticsearch from within Flink. When running in
+     *     a containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     * @param client The Elasticsearch client.
+     */
+    ElasticsearchSinkExternalContextBase(
+            String addressInternal, List<URL> connectorJarPaths, ElasticsearchClient client) {
+        this.addressInternal = checkNotNull(addressInternal);
+        this.connectorJarPaths = checkNotNull(connectorJarPaths);
+        this.client = checkNotNull(client);
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<KeyValue<Integer, String>> randomStringRecords = new ArrayList<>();

Review Comment:
   Unused?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot refresh index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName);

Review Comment:
   Please always also include the exception when logging. This affects some places in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #19405: [FLINK-27066] Reintroduce e2e tests in ES as Java tests.

Posted by GitBox <gi...@apache.org>.
afedulov commented on PR #19405:
URL: https://github.com/apache/flink/pull/19405#issuecomment-1111962794

   Resolved the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org