You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/13 14:00:47 UTC

[flink-connector-elasticsearch] branch main updated (19dfe58 -> 06e38d1)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


    from 19dfe58  [hotfix] Remove usage of deprecated RestHighLevelClient#bulkAsync
     new 86decf0  [hotfix][build] Rat ignores japicmp output
     new 6823b86  [hotfix][ci] Upgrade Maven to 3.8.6
     new afd3784  [hotfix][build] Fix flink-annotations version
     new a9328df  [FLINK-28410][tests] Sync E2E tests
     new b793850  [hotfix][build] Remove unnecessary surefire config
     new 06e38d1  [hotfix][ci] Hide Maven download transfer

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml                           |  17 +-
 .../pom.xml                                        |  77 +++++++++
 .../flink/streaming/tests/ElasticsearchClient.java |  59 +++++++
 .../streaming/tests/ElasticsearchDataReader.java   |  57 +++++++
 .../tests/ElasticsearchSinkE2ECaseBase.java        | 101 ++++++++++++
 .../ElasticsearchSinkExternalContextBase.java      | 123 +++++++++++++++
 ...lasticsearchSinkExternalContextFactoryBase.java |  59 +++++++
 .../streaming/tests/ElasticsearchTestEmitter.java  |  51 ++++++
 .../org/apache/flink/streaming/tests/KeyValue.java |  92 +++++++++++
 .../apache/flink/streaming/tests/QueryParams.java  | 174 +++++++++++++++++++++
 .../streaming/tests/UpdateRequestFactory.java      |  43 +++++
 .../flink/test/parameters/ParameterProperty.java   |  58 +++++++
 .../org/apache/flink/tests/util/TestUtils.java     |  85 ++++++++++
 .../pom.xml                                        | 120 ++++++++++++++
 .../streaming/tests/Elasticsearch6Client.java      | 149 ++++++++++++++++++
 .../streaming/tests/UpdateRequest6Factory.java     |  48 ++++++
 .../streaming/tests/Elasticsearch6SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch6SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch6SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  24 +--
 .../pom.xml                                        | 121 ++++++++++++++
 .../streaming/tests/Elasticsearch7Client.java      | 147 +++++++++++++++++
 .../streaming/tests/UpdateRequest7Factory.java     |  46 ++++++
 .../streaming/tests/Elasticsearch7SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch7SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch7SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  23 ++-
 flink-connector-elasticsearch-e2e-tests/pom.xml    | 128 +++++++++++++++
 pom.xml                                            |  11 +-
 29 files changed, 2137 insertions(+), 28 deletions(-)
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
 copy {flink-connector-elasticsearch6 => flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests}/src/test/resources/log4j2-test.properties (60%)
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
 create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
 copy {flink-connector-elasticsearch6 => flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests}/src/test/resources/log4j2-test.properties (60%)
 create mode 100644 flink-connector-elasticsearch-e2e-tests/pom.xml


[flink-connector-elasticsearch] 04/06: [FLINK-28410][tests] Sync E2E tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit a9328dfcd7793b73cddc428d4c4a0aa827d220ca
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:42:47 2022 +0200

    [FLINK-28410][tests] Sync E2E tests
---
 .github/workflows/ci.yml                           |  13 +-
 .../pom.xml                                        |  77 +++++++++
 .../flink/streaming/tests/ElasticsearchClient.java |  59 +++++++
 .../streaming/tests/ElasticsearchDataReader.java   |  57 +++++++
 .../tests/ElasticsearchSinkE2ECaseBase.java        | 101 ++++++++++++
 .../ElasticsearchSinkExternalContextBase.java      | 123 +++++++++++++++
 ...lasticsearchSinkExternalContextFactoryBase.java |  59 +++++++
 .../streaming/tests/ElasticsearchTestEmitter.java  |  51 ++++++
 .../org/apache/flink/streaming/tests/KeyValue.java |  92 +++++++++++
 .../apache/flink/streaming/tests/QueryParams.java  | 174 +++++++++++++++++++++
 .../streaming/tests/UpdateRequestFactory.java      |  43 +++++
 .../flink/test/parameters/ParameterProperty.java   |  58 +++++++
 .../org/apache/flink/tests/util/TestUtils.java     |  85 ++++++++++
 .../pom.xml                                        | 120 ++++++++++++++
 .../streaming/tests/Elasticsearch6Client.java      | 149 ++++++++++++++++++
 .../streaming/tests/UpdateRequest6Factory.java     |  48 ++++++
 .../streaming/tests/Elasticsearch6SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch6SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch6SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  34 ++++
 .../pom.xml                                        | 121 ++++++++++++++
 .../streaming/tests/Elasticsearch7Client.java      | 147 +++++++++++++++++
 .../streaming/tests/UpdateRequest7Factory.java     |  46 ++++++
 .../streaming/tests/Elasticsearch7SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch7SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch7SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  35 +++++
 flink-connector-elasticsearch-e2e-tests/pom.xml    | 128 +++++++++++++++
 28 files changed, 2171 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c23d82c..9aff109 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,6 +26,7 @@ jobs:
         jdk: [8, 11]
     env:
       MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+      FLINK_URL: https://s3.amazonaws.com/flink-nightly/flink-1.16-SNAPSHOT-bin-scala_2.12.tgz
     steps:
       - run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}"
 
@@ -45,4 +46,14 @@ jobs:
           maven-version: 3.8.6
 
       - name: Compile and test flink-connector-elasticsearch
-        run: mvn clean install -Dscala-2.12 -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
+        run: |
+          pushd .. \
+            && wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz \
+            && popd
+          
+          mvn clean install -U -B \
+            -Dscala-2.12 \
+            -Prun-end-to-end-tests -DdistDir=$(pwd)/../flink-1.16-SNAPSHOT \
+            -Dflink.convergence.phase=install -Pcheck-convergence \
+            ${{ env.MVN_CONNECTION_OPTIONS }} \
+            -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
new file mode 100644
index 0000000..b7b6e1d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+	<name>Flink : Connectors : Elasticsearch : E2E tests : Common</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${flink.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${flink.version}</version>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${testcontainers.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter</artifactId>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
new file mode 100644
index 0000000..ad97915
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import java.util.List;
+
+/** The version-agnostic Elasticsearch client interface. */
+public interface ElasticsearchClient {
+
+    /**
+     * Delete the index.
+     *
+     * @param indexName The index name.
+     */
+    void deleteIndex(String indexName);
+
+    /**
+     * Refresh the index.
+     *
+     * @param indexName The index name.
+     */
+    void refreshIndex(String indexName);
+
+    /**
+     * Create index if it does not exist.
+     *
+     * @param indexName The index name.
+     * @param shards The number of shards.
+     * @param replicas The number of replicas.
+     */
+    void createIndexIfDoesNotExist(String indexName, int shards, int replicas);
+
+    /** Close the client. @throws Exception The exception. */
+    void close() throws Exception;
+
+    /**
+     * Fetch all results from the index.
+     *
+     * @param params The parameters of the query.
+     * @return All documents from the index.
+     */
+    List<KeyValue<Integer, String>> fetchAll(QueryParams params);
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
new file mode 100644
index 0000000..165ca6d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        QueryParams params =
+                QueryParams.newBuilder(indexName)
+                        .sortField("key")
+                        .pageLength(pageLength)
+                        .trackTotalHits(true)
+                        .build();
+        return client.fetchAll(params);
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
new file mode 100644
index 0000000..b6f4a9f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */
+@SuppressWarnings("unused")
+public abstract class ElasticsearchSinkE2ECaseBase<T extends Comparable<T>>
+        extends SinkTestSuiteBase<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkE2ECaseBase.class);
+    private static final int READER_RETRY_ATTEMPTS = 10;
+    private static final int READER_TIMEOUT = -1; // Not used
+
+    protected static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv
+    protected FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(getElasticsearchContainerName()))
+                                    .withEnv(
+                                            "cluster.routing.allocation.disk.threshold_enabled",
+                                            "false")
+                                    .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    @Override
+    protected void checkResultWithSemantic(
+            ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+            throws Exception {
+        waitUntilCondition(
+                () -> {
+                    try {
+                        List<T> result = reader.poll(Duration.ofMillis(READER_TIMEOUT));
+                        assertThat(sort(result).iterator())
+                                .matchesRecordsFromSource(
+                                        Collections.singletonList(sort(testData)), semantic);
+                        return true;
+                    } catch (Throwable t) {
+                        LOG.warn("Polled results not as expected", t);
+                        return false;
+                    }
+                },
+                5000,
+                READER_RETRY_ATTEMPTS);
+    }
+
+    private List<T> sort(List<T> list) {
+        return list.stream().sorted().collect(Collectors.toList());
+    }
+
+    abstract String getElasticsearchContainerName();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
new file mode 100644
index 0000000..598cf43
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context. */
+abstract class ElasticsearchSinkExternalContextBase
+        implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> {
+    /** The constant INDEX_NAME_PREFIX. */
+    protected static final String INDEX_NAME_PREFIX = "es-index";
+
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    protected static final int BULK_BUFFER = 100;
+    protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1;
+    /** The index name. */
+    protected final String indexName;
+
+    /** The address reachable from Flink (internal to the testing environment). */
+    protected final String addressInternal;
+
+    /** The connector jar paths. */
+    protected final List<URL> connectorJarPaths;
+
+    /** The client. */
+    protected final ElasticsearchClient client;
+
+    /**
+     * Instantiates a new Elasticsearch sink context base.
+     *
+     * @param addressInternal The address to access Elasticsearch from within Flink. When running in
+     *     a containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     * @param client The Elasticsearch client.
+     */
+    ElasticsearchSinkExternalContextBase(
+            String addressInternal, List<URL> connectorJarPaths, ElasticsearchClient client) {
+        this.addressInternal = checkNotNull(addressInternal);
+        this.connectorJarPaths = checkNotNull(connectorJarPaths);
+        this.client = checkNotNull(client);
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        return IntStream.range(0, recordNum)
+                .boxed()
+                .map(
+                        i -> {
+                            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+                            String value = RandomStringUtils.random(valueLength, true, true);
+                            return KeyValue.of(i, value);
+                        })
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void close() {
+        client.deleteIndex(indexName);
+    }
+
+    @Override
+    public List<URL> getConnectorJarPaths() {
+        return connectorJarPaths;
+    }
+
+    @Override
+    public TypeInformation<KeyValue<Integer, String>> getProducedType() {
+        return TypeInformation.of(new TypeHint<KeyValue<Integer, String>>() {});
+    }
+
+    @Override
+    public abstract Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings);
+
+    @Override
+    public abstract ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings);
+
+    @Override
+    public abstract String toString();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
new file mode 100644
index 0000000..0da9b2b
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalContext;
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context factory base. */
+public abstract class ElasticsearchSinkExternalContextFactoryBase<T extends ExternalContext>
+        implements ExternalContextFactory<T> {
+
+    /** The Elasticsearch container. */
+    protected final ElasticsearchContainer elasticsearchContainer;
+
+    /** The connector jars. */
+    protected final List<URL> connectorJars;
+
+    /**
+     * Instantiates a new Elasticsearch sink context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    ElasticsearchSinkExternalContextFactoryBase(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        this.elasticsearchContainer = checkNotNull(elasticsearchContainer);
+        this.connectorJars = checkNotNull(connectorJars);
+    }
+
+    protected static String formatInternalAddress(
+            GenericContainer<ElasticsearchContainer> container) {
+        return String.format(
+                "%s:%d", container.getNetworkAliases().get(0), container.getExposedPorts().get(0));
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
new file mode 100644
index 0000000..7a5c36c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public class ElasticsearchTestEmitter implements ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final UpdateRequestFactory factory;
+
+    /**
+     * Instantiates a new Elasticsearch test emitter.
+     *
+     * @param factory The factory for creating {@link UpdateRequest}s.
+     */
+    public ElasticsearchTestEmitter(UpdateRequestFactory factory) {
+        this.factory = checkNotNull(factory);
+    }
+
+    @Override
+    public void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        UpdateRequest updateRequest = factory.createUpdateRequest(element);
+        indexer.add(updateRequest);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
new file mode 100644
index 0000000..43db0c4
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {
+        int d = this.key.compareTo(other.key);
+        if (d == 0) {
+            return this.value.compareTo(other.value);
+        }
+        return d;
+    }
+
+    /** Creates a new key-value pair. */
+    public static <K extends Comparable<? super K>, T1 extends Comparable<? super T1>>
+            KeyValue<K, T1> of(K key, T1 value) {
+        return new KeyValue<>(key, value);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof KeyValue)) {
+            return false;
+        }
+        @SuppressWarnings("rawtypes")
+        KeyValue keyValue = (KeyValue) o;
+        if (key != null ? !key.equals(keyValue.key) : keyValue.key != null) {
+            return false;
+        }
+        if (value != null ? !value.equals(keyValue.value) : keyValue.value != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, value);
+    }
+
+    @Override
+    public String toString() {
+        return "("
+                + StringUtils.arrayAwareToString(this.key)
+                + ","
+                + StringUtils.arrayAwareToString(this.value)
+                + ")";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
new file mode 100644
index 0000000..22cf7cf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Holder class for Elasticsearch query parameters. */
+public class QueryParams {
+    private final String indexName;
+    private final String sortField;
+    private final int from;
+    private final int pageLength;
+    private final boolean trackTotalHits;
+
+    private QueryParams(Builder builder) {
+        indexName = builder.indexName;
+        sortField = builder.sortField;
+        from = builder.from;
+        pageLength = builder.pageLength;
+        trackTotalHits = builder.trackTotalHits;
+    }
+
+    /**
+     * New {@code QueryParams} builder.
+     *
+     * @param indexName The index name. This parameter is mandatory.
+     * @return The builder.
+     */
+    public static Builder newBuilder(String indexName) {
+        return new Builder(indexName);
+    }
+
+    /** {@code QueryParams} builder static inner class. */
+    public static final class Builder {
+        private String sortField;
+        private int from;
+        private int pageLength;
+        private boolean trackTotalHits;
+        private String indexName;
+
+        private Builder(String indexName) {
+            this.indexName = checkNotNull(indexName);
+        }
+
+        /**
+         * Sets the {@code sortField} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param sortField The {@code sortField} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder sortField(String sortField) {
+            this.sortField = checkNotNull(sortField);
+            return this;
+        }
+
+        /**
+         * Sets the {@code from} and returns a reference to this Builder enabling method chaining.
+         *
+         * @param from The {@code from} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder from(int from) {
+            this.from = from;
+            return this;
+        }
+
+        /**
+         * Sets the {@code pageLength} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param pageLength The {@code pageLength} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder pageLength(int pageLength) {
+            this.pageLength = pageLength;
+            return this;
+        }
+
+        /**
+         * Sets the {@code trackTotalHits} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param trackTotalHits The {@code trackTotalHits} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder trackTotalHits(boolean trackTotalHits) {
+            this.trackTotalHits = trackTotalHits;
+            return this;
+        }
+
+        /**
+         * Returns a {@code QueryParams} built from the parameters previously set.
+         *
+         * @return A {@code QueryParams} built with parameters of this {@code QueryParams.Builder}
+         */
+        public QueryParams build() {
+            return new QueryParams(this);
+        }
+
+        /**
+         * Sets the {@code indexName} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param indexName The {@code indexName} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder indexName(String indexName) {
+            this.indexName = checkNotNull(indexName);
+            return this;
+        }
+    }
+
+    /**
+     * Sort field string.
+     *
+     * @return The string.
+     */
+    public String sortField() {
+        return sortField;
+    }
+
+    /**
+     * From index to start the search from. Defaults to {@code 0}.
+     *
+     * @return The int.
+     */
+    public int from() {
+        return from;
+    }
+
+    /**
+     * Page length int.
+     *
+     * @return The int.
+     */
+    public int pageLength() {
+        return pageLength;
+    }
+
+    /**
+     * Track total hits boolean.
+     *
+     * @return The boolean.
+     */
+    public boolean trackTotalHits() {
+        return trackTotalHits;
+    }
+
+    /**
+     * Index name string.
+     *
+     * @return The string.
+     */
+    public String indexName() {
+        return indexName;
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
new file mode 100644
index 0000000..d7b8bb7
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Factory for creating UpdateRequests. */
+public interface UpdateRequestFactory extends Serializable {
+    UpdateRequest createUpdateRequest(KeyValue<Integer, String> element);
+
+    /**
+     * Utility to convert {@link KeyValue} elements into Elasticsearch-compatible format.
+     *
+     * @param element The element to be converted.
+     * @return The map with the element's fields.
+     */
+    static Map<String, Object> prepareDoc(KeyValue<Integer, String> element) {
+        Map<String, Object> json = new HashMap<>();
+        json.put("key", element.key);
+        json.put("value", element.value);
+        return json;
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
new file mode 100644
index 0000000..a2bcfdf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.parameters;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** System-property based parameters for tests and resources. */
+public class ParameterProperty<V> {
+
+    private final String propertyName;
+    private final Function<String, V> converter;
+
+    public ParameterProperty(final String propertyName, final Function<String, V> converter) {
+        this.propertyName = propertyName;
+        this.converter = converter;
+    }
+
+    public String getPropertyName() {
+        return propertyName;
+    }
+
+    /**
+     * Retrieves the value of this property.
+     *
+     * @return Optional containing the value of this property
+     */
+    public Optional<V> get() {
+        final String value = System.getProperty(propertyName);
+        return value == null ? Optional.empty() : Optional.of(converter.apply(value));
+    }
+
+    /**
+     * Retrieves the value of this property, or the given default if no value was set.
+     *
+     * @return the value of this property, or the given default if no value was set
+     */
+    public V get(final V defaultValue) {
+        final String value = System.getProperty(propertyName);
+        return value == null ? defaultValue : converter.apply(value);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
new file mode 100644
index 0000000..980aaa9
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.test.parameters.ParameterProperty;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** General test utilities. */
+public enum TestUtils {
+    ;
+
+    private static final ParameterProperty<Path> MODULE_DIRECTORY =
+            new ParameterProperty<>("moduleDir", Paths::get);
+
+    /**
+     * Searches for a resource file matching the given regex in the given directory. This method is
+     * primarily intended to be used for the initialization of static {@link Path} fields for
+     * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
+     *
+     * @param resourceNameRegex regex pattern to match against
+     * @return Path pointing to the matching jar
+     * @throws RuntimeException if none or multiple resource files could be found
+     */
+    public static Path getResource(final String resourceNameRegex) {
+        // if the property is not set then we are most likely running in the IDE, where the working
+        // directory is the
+        // module of the test that is currently running, which is exactly what we want
+        Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+
+        try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
+            final List<Path> matchingResources =
+                    dependencyResources
+                            .filter(
+                                    jar ->
+                                            Pattern.compile(resourceNameRegex)
+                                                    .matcher(jar.toAbsolutePath().toString())
+                                                    .find())
+                            .collect(Collectors.toList());
+            switch (matchingResources.size()) {
+                case 0:
+                    throw new RuntimeException(
+                            new FileNotFoundException(
+                                    String.format(
+                                            "No resource file could be found that matches the pattern %s. "
+                                                    + "This could mean that the test module must be rebuilt via maven.",
+                                            resourceNameRegex)));
+                case 1:
+                    return matchingResources.get(0);
+                default:
+                    throw new RuntimeException(
+                            new IOException(
+                                    String.format(
+                                            "Multiple resource files were found matching the pattern %s. Matches=%s",
+                                            resourceNameRegex, matchingResources)));
+            }
+        } catch (final IOException ioe) {
+            throw new RuntimeException("Could not search for resource resource files.", ioe);
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
new file mode 100644
index 0000000..c7d706f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch6-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch 6 : E2E tests</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>elasticsearch6-end-to-end-test</finalName>
+							<outputDirectory>dependencies</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${flink.version}</version>
+							<destFileName>flink-connector-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
new file mode 100644
index 0000000..b2c80fc
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot refresh index {}", indexName, e);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            LOG.error("Fetching records failed", e);
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
new file mode 100644
index 0000000..9659913
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch6. */
+public class UpdateRequest6Factory implements UpdateRequestFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String indexName;
+
+    /**
+     * Instantiates a new update request factory for of Elasticsearch6.
+     *
+     * @param indexName The index name.
+     */
+    public UpdateRequest6Factory(String indexName) {
+        this.indexName = indexName;
+    }
+
+    @Override
+    public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+        Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+        return new UpdateRequest(indexName, "doc", String.valueOf(element.key))
+                .doc(json)
+                .upsert(json);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
new file mode 100644
index 0000000..cc95275
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch6Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase
+        extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6SinkE2ECase.class);
+
+    public Elasticsearch6SinkE2ECase() throws Exception {}
+
+    String getElasticsearchContainerName() {
+        return DockerImageVersions.ELASTICSEARCH_6;
+    }
+
+    @TestContext
+    Elasticsearch6SinkExternalContextFactory contextFactory =
+            new Elasticsearch6SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("dependencies/elasticsearch6-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource(
+                                            "dependencies/flink-connector-elasticsearch-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
new file mode 100644
index 0000000..1ccbcd2
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch6SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+    /**
+     * Instantiates a new Elasticsearch 6 sink context base.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+     *     containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     */
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        super(addressInternal, connectorJarPaths, new Elasticsearch6Client(addressExternal));
+    }
+
+    @Override
+    public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+        client.createIndexIfDoesNotExist(indexName, 1, 0);
+        return new Elasticsearch6SinkBuilder<KeyValue<Integer, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest6Factory(indexName)))
+                .setBulkFlushMaxActions(BULK_BUFFER)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public String toString() {
+        return "Elasticsearch 6 sink context.";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
new file mode 100644
index 0000000..690fbd5
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch6SinkExternalContextFactory
+        extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch6SinkExternalContext> {
+
+    /**
+     * Instantiates a new Elasticsearch 6 sink external context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    Elasticsearch6SinkExternalContextFactory(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        super(elasticsearchContainer, connectorJars);
+    }
+
+    @Override
+    public Elasticsearch6SinkExternalContext createExternalContext(String testName) {
+        return new Elasticsearch6SinkExternalContext(
+                elasticsearchContainer.getHttpHostAddress(),
+                formatInternalAddress(elasticsearchContainer),
+                connectorJars);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..4da8bba
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
new file mode 100644
index 0000000..9e020f1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch7-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch 7 : E2E Tests</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>elasticsearch7-end-to-end-test</finalName>
+							<outputDirectory>dependencies</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${flink.version}</version>
+							<destFileName>flink-connector-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
new file mode 100644
index 0000000..7faba25
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            LOG.error("Fetching records failed", e);
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
new file mode 100644
index 0000000..875d09a
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch7. */
+public class UpdateRequest7Factory implements UpdateRequestFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String indexName;
+
+    /**
+     * Instantiates a new update request factory for of Elasticsearch7.
+     *
+     * @param indexName The index name.
+     */
+    public UpdateRequest7Factory(String indexName) {
+        this.indexName = indexName;
+    }
+
+    @Override
+    public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+        Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+        return new UpdateRequest(indexName, String.valueOf(element.key)).doc(json).upsert(json);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
new file mode 100644
index 0000000..59be31c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch7SinkE2ECase
+        extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SinkE2ECase.class);
+
+    public Elasticsearch7SinkE2ECase() throws Exception {}
+
+    String getElasticsearchContainerName() {
+        return DockerImageVersions.ELASTICSEARCH_7;
+    }
+
+    @TestContext
+    Elasticsearch7SinkExternalContextFactory contextFactory =
+            new Elasticsearch7SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("dependencies/elasticsearch7-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource(
+                                            "dependencies/flink-connector-elasticsearch-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
new file mode 100644
index 0000000..aa31ea0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch7SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+    /**
+     * Instantiates a new Elasticsearch 7 sink context base.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+     *     containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     */
+    Elasticsearch7SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        super(addressInternal, connectorJarPaths, new Elasticsearch7Client(addressExternal));
+    }
+
+    @Override
+    public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+        client.createIndexIfDoesNotExist(indexName, 1, 0);
+        return new Elasticsearch7SinkBuilder<KeyValue<Integer, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest7Factory(indexName)))
+                .setBulkFlushMaxActions(BULK_BUFFER)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public String toString() {
+        return "Elasticsearch 7 sink context.";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
new file mode 100644
index 0000000..c0e3f41
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch7SinkExternalContextFactory
+        extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch7SinkExternalContext> {
+
+    /**
+     * Instantiates a new Elasticsearch 7 sink external context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    Elasticsearch7SinkExternalContextFactory(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        super(elasticsearchContainer, connectorJars);
+    }
+
+    @Override
+    public Elasticsearch7SinkExternalContext createExternalContext(String testName) {
+        return new Elasticsearch7SinkExternalContext(
+                elasticsearchContainer.getHttpHostAddress(),
+                formatInternalAddress(elasticsearchContainer),
+                connectorJars);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e48d6c0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
+
diff --git a/flink-connector-elasticsearch-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/pom.xml
new file mode 100644
index 0000000..87756e1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-parent</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<packaging>pom</packaging>
+
+	<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch : E2E Tests</name>
+
+	<modules>
+		<module>flink-connector-elasticsearch-e2e-tests-common</module>
+		<module>flink-connector-elasticsearch6-e2e-tests</module>
+		<module>flink-connector-elasticsearch7-e2e-tests</module>
+    </modules>
+
+	<profiles>
+		<profile>
+			<id>run-end-to-end-tests</id>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-surefire-plugin</artifactId>
+						<executions>
+							<execution>
+								<id>end-to-end-tests</id>
+								<phase>integration-test</phase>
+								<goals>
+									<goal>test</goal>
+								</goals>
+								<configuration>
+									<includes>
+										<include>**/*.*</include>
+									</includes>
+									<!-- E2E tests must not access flink-dist concurrently. -->
+									<forkCount>1</forkCount>
+									<systemPropertyVariables>
+										<moduleDir>${project.basedir}</moduleDir>
+									</systemPropertyVariables>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-deploy-plugin</artifactId>
+				<configuration>
+					<skip>true</skip>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>default-test</id>
+						<phase>none</phase>
+					</execution>
+					<execution>
+						<id>integration-tests</id>
+						<phase>none</phase>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-shade-plugin</artifactId>
+					<configuration>
+						<artifactSet>
+							<excludes combine.children="append">
+								<exclude>com.google.code.findbugs:jsr305</exclude>
+								<exclude>org.slf4j:slf4j-api</exclude>
+							</excludes>
+						</artifactSet>
+						<filters combine.children="append">
+							<filter>
+								<artifact>*:*</artifact>
+								<excludes>
+									<exclude>META-INF/*.SF</exclude>
+									<exclude>META-INF/*.DSA</exclude>
+									<exclude>META-INF/*.RSA</exclude>
+								</excludes>
+							</filter>
+						</filters>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+	
+</project>


[flink-connector-elasticsearch] 06/06: [hotfix][ci] Hide Maven download transfer

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit 06e38d1e2d8d998e06065c011098f85d0235860c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 14:34:23 2022 +0200

    [hotfix][ci] Hide Maven download transfer
---
 .github/workflows/ci.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9aff109..f4affdd 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -51,7 +51,7 @@ jobs:
             && wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz \
             && popd
           
-          mvn clean install -U -B \
+          mvn clean install -U -B --no-transfer-progress \
             -Dscala-2.12 \
             -Prun-end-to-end-tests -DdistDir=$(pwd)/../flink-1.16-SNAPSHOT \
             -Dflink.convergence.phase=install -Pcheck-convergence \


[flink-connector-elasticsearch] 05/06: [hotfix][build] Remove unnecessary surefire config

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit b793850a2c7d3717dfc25c72bd0b5a59ff856442
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 13:05:04 2022 +0200

    [hotfix][build] Remove unnecessary surefire config
---
 pom.xml | 7 -------
 1 file changed, 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index 01efb89..e0336fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -427,13 +427,6 @@ under the License.
 								</dependency>
 							</dependencies>
 						</plugin>
-						<plugin>
-							<groupId>org.apache.maven.plugins</groupId>
-							<artifactId>maven-surefire-plugin</artifactId>
-							<configuration>
-								<excludedGroups>org.apache.flink.testutils.junit.FailsOnJava11</excludedGroups>
-							</configuration>
-						</plugin>
 						<plugin>
 							<groupId>org.apache.maven.plugins</groupId>
 							<artifactId>maven-javadoc-plugin</artifactId>


[flink-connector-elasticsearch] 02/06: [hotfix][ci] Upgrade Maven to 3.8.6

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit 6823b8656c896dffa6dc8b132762733c5ac44b64
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:32:43 2022 +0200

    [hotfix][ci] Upgrade Maven to 3.8.6
---
 .github/workflows/ci.yml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3700352..c23d82c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -39,10 +39,10 @@ jobs:
           distribution: 'temurin'
           cache: 'maven'
 
-      - name: Set Maven 3.8.5
+      - name: Set Maven 3.8.6
         uses: stCarolas/setup-maven@v4.2
         with:
-          maven-version: 3.8.5
+          maven-version: 3.8.6
 
       - name: Compile and test flink-connector-elasticsearch
         run: mvn clean install -Dscala-2.12 -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties


[flink-connector-elasticsearch] 03/06: [hotfix][build] Fix flink-annotations version

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit afd378460a58d4064b6ae12a3fc17459bcb0923f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:42:14 2022 +0200

    [hotfix][build] Fix flink-annotations version
---
 pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 0a840e6..01efb89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-connector-elasticsearch-base</module>
 		<module>flink-connector-elasticsearch6</module>
 		<module>flink-connector-elasticsearch7</module>
+		<module>flink-connector-elasticsearch-e2e-tests</module>
 	</modules>
 
 	<properties>
@@ -1314,7 +1315,7 @@ under the License.
 							<dependency>
 								<groupId>org.apache.flink</groupId>
 								<artifactId>flink-annotations</artifactId>
-								<version>${project.version}</version>
+								<version>${flink.version}</version>
 							</dependency>
 						</dependencies>
 					</configuration>


[flink-connector-elasticsearch] 01/06: [hotfix][build] Rat ignores japicmp output

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit 86decf0f5326559b236e391aa245adf95ae334b4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:09:57 2022 +0200

    [hotfix][build] Rat ignores japicmp output
---
 pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pom.xml b/pom.xml
index 49acc98..0a840e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -830,6 +830,7 @@ under the License.
 						<!-- Build files -->
 						<exclude>**/*.iml</exclude>
 						<!-- Generated content -->
+						<exclude>tools/japicmp-output/**</exclude>
 						<exclude>out/**</exclude>
 						<exclude>**/target/**</exclude>
 						<exclude>docs/layouts/shortcodes/generated/**</exclude>