You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/13 14:00:51 UTC

[flink-connector-elasticsearch] 04/06: [FLINK-28410][tests] Sync E2E tests

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit a9328dfcd7793b73cddc428d4c4a0aa827d220ca
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:42:47 2022 +0200

    [FLINK-28410][tests] Sync E2E tests
---
 .github/workflows/ci.yml                           |  13 +-
 .../pom.xml                                        |  77 +++++++++
 .../flink/streaming/tests/ElasticsearchClient.java |  59 +++++++
 .../streaming/tests/ElasticsearchDataReader.java   |  57 +++++++
 .../tests/ElasticsearchSinkE2ECaseBase.java        | 101 ++++++++++++
 .../ElasticsearchSinkExternalContextBase.java      | 123 +++++++++++++++
 ...lasticsearchSinkExternalContextFactoryBase.java |  59 +++++++
 .../streaming/tests/ElasticsearchTestEmitter.java  |  51 ++++++
 .../org/apache/flink/streaming/tests/KeyValue.java |  92 +++++++++++
 .../apache/flink/streaming/tests/QueryParams.java  | 174 +++++++++++++++++++++
 .../streaming/tests/UpdateRequestFactory.java      |  43 +++++
 .../flink/test/parameters/ParameterProperty.java   |  58 +++++++
 .../org/apache/flink/tests/util/TestUtils.java     |  85 ++++++++++
 .../pom.xml                                        | 120 ++++++++++++++
 .../streaming/tests/Elasticsearch6Client.java      | 149 ++++++++++++++++++
 .../streaming/tests/UpdateRequest6Factory.java     |  48 ++++++
 .../streaming/tests/Elasticsearch6SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch6SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch6SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  34 ++++
 .../pom.xml                                        | 121 ++++++++++++++
 .../streaming/tests/Elasticsearch7Client.java      | 147 +++++++++++++++++
 .../streaming/tests/UpdateRequest7Factory.java     |  46 ++++++
 .../streaming/tests/Elasticsearch7SinkE2ECase.java |  60 +++++++
 .../tests/Elasticsearch7SinkExternalContext.java   |  68 ++++++++
 .../Elasticsearch7SinkExternalContextFactory.java  |  48 ++++++
 .../src/test/resources/log4j2-test.properties      |  35 +++++
 flink-connector-elasticsearch-e2e-tests/pom.xml    | 128 +++++++++++++++
 28 files changed, 2171 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c23d82c..9aff109 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,6 +26,7 @@ jobs:
         jdk: [8, 11]
     env:
       MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+      FLINK_URL: https://s3.amazonaws.com/flink-nightly/flink-1.16-SNAPSHOT-bin-scala_2.12.tgz
     steps:
       - run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}"
 
@@ -45,4 +46,14 @@ jobs:
           maven-version: 3.8.6
 
       - name: Compile and test flink-connector-elasticsearch
-        run: mvn clean install -Dscala-2.12 -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
+        run: |
+          pushd .. \
+            && wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz \
+            && popd
+          
+          mvn clean install -U -B \
+            -Dscala-2.12 \
+            -Prun-end-to-end-tests -DdistDir=$(pwd)/../flink-1.16-SNAPSHOT \
+            -Dflink.convergence.phase=install -Pcheck-convergence \
+            ${{ env.MVN_CONNECTION_OPTIONS }} \
+            -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
new file mode 100644
index 0000000..b7b6e1d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+	<name>Flink : Connectors : Elasticsearch : E2E tests : Common</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${flink.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${flink.version}</version>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${testcontainers.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter</artifactId>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
new file mode 100644
index 0000000..ad97915
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import java.util.List;
+
+/** The version-agnostic Elasticsearch client interface. */
+public interface ElasticsearchClient {
+
+    /**
+     * Delete the index.
+     *
+     * @param indexName The index name.
+     */
+    void deleteIndex(String indexName);
+
+    /**
+     * Refresh the index.
+     *
+     * @param indexName The index name.
+     */
+    void refreshIndex(String indexName);
+
+    /**
+     * Create index if it does not exist.
+     *
+     * @param indexName The index name.
+     * @param shards The number of shards.
+     * @param replicas The number of replicas.
+     */
+    void createIndexIfDoesNotExist(String indexName, int shards, int replicas);
+
+    /** Close the client. @throws Exception The exception. */
+    void close() throws Exception;
+
+    /**
+     * Fetch all results from the index.
+     *
+     * @param params The parameters of the query.
+     * @return All documents from the index.
+     */
+    List<KeyValue<Integer, String>> fetchAll(QueryParams params);
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
new file mode 100644
index 0000000..165ca6d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+        implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+    private final ElasticsearchClient client;
+    private final String indexName;
+    private final int pageLength;
+
+    public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+        this.client = checkNotNull(client);
+        this.indexName = checkNotNull(indexName);
+        this.pageLength = pageLength;
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> poll(Duration timeout) {
+        client.refreshIndex(indexName);
+        QueryParams params =
+                QueryParams.newBuilder(indexName)
+                        .sortField("key")
+                        .pageLength(pageLength)
+                        .trackTotalHits(true)
+                        .build();
+        return client.fetchAll(params);
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
new file mode 100644
index 0000000..b6f4a9f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */
+@SuppressWarnings("unused")
+public abstract class ElasticsearchSinkE2ECaseBase<T extends Comparable<T>>
+        extends SinkTestSuiteBase<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkE2ECaseBase.class);
+    private static final int READER_RETRY_ATTEMPTS = 10;
+    private static final int READER_TIMEOUT = -1; // Not used
+
+    protected static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+    // Defines TestEnvironment
+    @TestEnv
+    protected FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new ElasticsearchContainer(
+                                            DockerImageName.parse(getElasticsearchContainerName()))
+                                    .withEnv(
+                                            "cluster.routing.allocation.disk.threshold_enabled",
+                                            "false")
+                                    .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    @Override
+    protected void checkResultWithSemantic(
+            ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+            throws Exception {
+        waitUntilCondition(
+                () -> {
+                    try {
+                        List<T> result = reader.poll(Duration.ofMillis(READER_TIMEOUT));
+                        assertThat(sort(result).iterator())
+                                .matchesRecordsFromSource(
+                                        Collections.singletonList(sort(testData)), semantic);
+                        return true;
+                    } catch (Throwable t) {
+                        LOG.warn("Polled results not as expected", t);
+                        return false;
+                    }
+                },
+                5000,
+                READER_RETRY_ATTEMPTS);
+    }
+
+    private List<T> sort(List<T> list) {
+        return list.stream().sorted().collect(Collectors.toList());
+    }
+
+    abstract String getElasticsearchContainerName();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
new file mode 100644
index 0000000..598cf43
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context. */
+abstract class ElasticsearchSinkExternalContextBase
+        implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> {
+    /** The constant INDEX_NAME_PREFIX. */
+    protected static final String INDEX_NAME_PREFIX = "es-index";
+
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    protected static final int BULK_BUFFER = 100;
+    protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1;
+    /** The index name. */
+    protected final String indexName;
+
+    /** The address reachable from Flink (internal to the testing environment). */
+    protected final String addressInternal;
+
+    /** The connector jar paths. */
+    protected final List<URL> connectorJarPaths;
+
+    /** The client. */
+    protected final ElasticsearchClient client;
+
+    /**
+     * Instantiates a new Elasticsearch sink context base.
+     *
+     * @param addressInternal The address to access Elasticsearch from within Flink. When running in
+     *     a containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     * @param client The Elasticsearch client.
+     */
+    ElasticsearchSinkExternalContextBase(
+            String addressInternal, List<URL> connectorJarPaths, ElasticsearchClient client) {
+        this.addressInternal = checkNotNull(addressInternal);
+        this.connectorJarPaths = checkNotNull(connectorJarPaths);
+        this.client = checkNotNull(client);
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+
+        return IntStream.range(0, recordNum)
+                .boxed()
+                .map(
+                        i -> {
+                            int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+                            String value = RandomStringUtils.random(valueLength, true, true);
+                            return KeyValue.of(i, value);
+                        })
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void close() {
+        client.deleteIndex(indexName);
+    }
+
+    @Override
+    public List<URL> getConnectorJarPaths() {
+        return connectorJarPaths;
+    }
+
+    @Override
+    public TypeInformation<KeyValue<Integer, String>> getProducedType() {
+        return TypeInformation.of(new TypeHint<KeyValue<Integer, String>>() {});
+    }
+
+    @Override
+    public abstract Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings);
+
+    @Override
+    public abstract ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings);
+
+    @Override
+    public abstract String toString();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
new file mode 100644
index 0000000..0da9b2b
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalContext;
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context factory base. */
+public abstract class ElasticsearchSinkExternalContextFactoryBase<T extends ExternalContext>
+        implements ExternalContextFactory<T> {
+
+    /** The Elasticsearch container. */
+    protected final ElasticsearchContainer elasticsearchContainer;
+
+    /** The connector jars. */
+    protected final List<URL> connectorJars;
+
+    /**
+     * Instantiates a new Elasticsearch sink context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    ElasticsearchSinkExternalContextFactoryBase(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        this.elasticsearchContainer = checkNotNull(elasticsearchContainer);
+        this.connectorJars = checkNotNull(connectorJars);
+    }
+
+    protected static String formatInternalAddress(
+            GenericContainer<ElasticsearchContainer> container) {
+        return String.format(
+                "%s:%d", container.getNetworkAliases().get(0), container.getExposedPorts().get(0));
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
new file mode 100644
index 0000000..7a5c36c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public class ElasticsearchTestEmitter implements ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final UpdateRequestFactory factory;
+
+    /**
+     * Instantiates a new Elasticsearch test emitter.
+     *
+     * @param factory The factory for creating {@link UpdateRequest}s.
+     */
+    public ElasticsearchTestEmitter(UpdateRequestFactory factory) {
+        this.factory = checkNotNull(factory);
+    }
+
+    @Override
+    public void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+        UpdateRequest updateRequest = factory.createUpdateRequest(element);
+        indexer.add(updateRequest);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
new file mode 100644
index 0000000..43db0c4
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+        implements Comparable<KeyValue<K, V>>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The key of the key-value pair. */
+    public K key;
+    /** The value the key-value pair. */
+    public V value;
+
+    /** Creates a new key-value pair where all fields are null. */
+    public KeyValue() {}
+
+    private KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public int compareTo(KeyValue<K, V> other) {
+        int d = this.key.compareTo(other.key);
+        if (d == 0) {
+            return this.value.compareTo(other.value);
+        }
+        return d;
+    }
+
+    /** Creates a new key-value pair. */
+    public static <K extends Comparable<? super K>, T1 extends Comparable<? super T1>>
+            KeyValue<K, T1> of(K key, T1 value) {
+        return new KeyValue<>(key, value);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof KeyValue)) {
+            return false;
+        }
+        @SuppressWarnings("rawtypes")
+        KeyValue keyValue = (KeyValue) o;
+        if (key != null ? !key.equals(keyValue.key) : keyValue.key != null) {
+            return false;
+        }
+        if (value != null ? !value.equals(keyValue.value) : keyValue.value != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, value);
+    }
+
+    @Override
+    public String toString() {
+        return "("
+                + StringUtils.arrayAwareToString(this.key)
+                + ","
+                + StringUtils.arrayAwareToString(this.value)
+                + ")";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
new file mode 100644
index 0000000..22cf7cf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Holder class for Elasticsearch query parameters. */
+public class QueryParams {
+    private final String indexName;
+    private final String sortField;
+    private final int from;
+    private final int pageLength;
+    private final boolean trackTotalHits;
+
+    private QueryParams(Builder builder) {
+        indexName = builder.indexName;
+        sortField = builder.sortField;
+        from = builder.from;
+        pageLength = builder.pageLength;
+        trackTotalHits = builder.trackTotalHits;
+    }
+
+    /**
+     * New {@code QueryParams} builder.
+     *
+     * @param indexName The index name. This parameter is mandatory.
+     * @return The builder.
+     */
+    public static Builder newBuilder(String indexName) {
+        return new Builder(indexName);
+    }
+
+    /** {@code QueryParams} builder static inner class. */
+    public static final class Builder {
+        private String sortField;
+        private int from;
+        private int pageLength;
+        private boolean trackTotalHits;
+        private String indexName;
+
+        private Builder(String indexName) {
+            this.indexName = checkNotNull(indexName);
+        }
+
+        /**
+         * Sets the {@code sortField} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param sortField The {@code sortField} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder sortField(String sortField) {
+            this.sortField = checkNotNull(sortField);
+            return this;
+        }
+
+        /**
+         * Sets the {@code from} and returns a reference to this Builder enabling method chaining.
+         *
+         * @param from The {@code from} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder from(int from) {
+            this.from = from;
+            return this;
+        }
+
+        /**
+         * Sets the {@code pageLength} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param pageLength The {@code pageLength} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder pageLength(int pageLength) {
+            this.pageLength = pageLength;
+            return this;
+        }
+
+        /**
+         * Sets the {@code trackTotalHits} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param trackTotalHits The {@code trackTotalHits} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder trackTotalHits(boolean trackTotalHits) {
+            this.trackTotalHits = trackTotalHits;
+            return this;
+        }
+
+        /**
+         * Returns a {@code QueryParams} built from the parameters previously set.
+         *
+         * @return A {@code QueryParams} built with parameters of this {@code QueryParams.Builder}
+         */
+        public QueryParams build() {
+            return new QueryParams(this);
+        }
+
+        /**
+         * Sets the {@code indexName} and returns a reference to this Builder enabling method
+         * chaining.
+         *
+         * @param indexName The {@code indexName} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder indexName(String indexName) {
+            this.indexName = checkNotNull(indexName);
+            return this;
+        }
+    }
+
+    /**
+     * Sort field string.
+     *
+     * @return The string.
+     */
+    public String sortField() {
+        return sortField;
+    }
+
+    /**
+     * From index to start the search from. Defaults to {@code 0}.
+     *
+     * @return The int.
+     */
+    public int from() {
+        return from;
+    }
+
+    /**
+     * Page length int.
+     *
+     * @return The int.
+     */
+    public int pageLength() {
+        return pageLength;
+    }
+
+    /**
+     * Track total hits boolean.
+     *
+     * @return The boolean.
+     */
+    public boolean trackTotalHits() {
+        return trackTotalHits;
+    }
+
+    /**
+     * Index name string.
+     *
+     * @return The string.
+     */
+    public String indexName() {
+        return indexName;
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
new file mode 100644
index 0000000..d7b8bb7
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Factory for creating UpdateRequests. */
+public interface UpdateRequestFactory extends Serializable {
+    UpdateRequest createUpdateRequest(KeyValue<Integer, String> element);
+
+    /**
+     * Utility to convert {@link KeyValue} elements into Elasticsearch-compatible format.
+     *
+     * @param element The element to be converted.
+     * @return The map with the element's fields.
+     */
+    static Map<String, Object> prepareDoc(KeyValue<Integer, String> element) {
+        Map<String, Object> json = new HashMap<>();
+        json.put("key", element.key);
+        json.put("value", element.value);
+        return json;
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
new file mode 100644
index 0000000..a2bcfdf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.parameters;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** System-property based parameters for tests and resources. */
+public class ParameterProperty<V> {
+
+    private final String propertyName;
+    private final Function<String, V> converter;
+
+    public ParameterProperty(final String propertyName, final Function<String, V> converter) {
+        this.propertyName = propertyName;
+        this.converter = converter;
+    }
+
+    public String getPropertyName() {
+        return propertyName;
+    }
+
+    /**
+     * Retrieves the value of this property.
+     *
+     * @return Optional containing the value of this property
+     */
+    public Optional<V> get() {
+        final String value = System.getProperty(propertyName);
+        return value == null ? Optional.empty() : Optional.of(converter.apply(value));
+    }
+
+    /**
+     * Retrieves the value of this property, or the given default if no value was set.
+     *
+     * @return the value of this property, or the given default if no value was set
+     */
+    public V get(final V defaultValue) {
+        final String value = System.getProperty(propertyName);
+        return value == null ? defaultValue : converter.apply(value);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
new file mode 100644
index 0000000..980aaa9
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.test.parameters.ParameterProperty;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** General test utilities. */
+public enum TestUtils {
+    ;
+
+    private static final ParameterProperty<Path> MODULE_DIRECTORY =
+            new ParameterProperty<>("moduleDir", Paths::get);
+
+    /**
+     * Searches for a resource file matching the given regex in the given directory. This method is
+     * primarily intended to be used for the initialization of static {@link Path} fields for
+     * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
+     *
+     * @param resourceNameRegex regex pattern to match against
+     * @return Path pointing to the matching jar
+     * @throws RuntimeException if none or multiple resource files could be found
+     */
+    public static Path getResource(final String resourceNameRegex) {
+        // if the property is not set then we are most likely running in the IDE, where the working
+        // directory is the
+        // module of the test that is currently running, which is exactly what we want
+        Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+
+        try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
+            final List<Path> matchingResources =
+                    dependencyResources
+                            .filter(
+                                    jar ->
+                                            Pattern.compile(resourceNameRegex)
+                                                    .matcher(jar.toAbsolutePath().toString())
+                                                    .find())
+                            .collect(Collectors.toList());
+            switch (matchingResources.size()) {
+                case 0:
+                    throw new RuntimeException(
+                            new FileNotFoundException(
+                                    String.format(
+                                            "No resource file could be found that matches the pattern %s. "
+                                                    + "This could mean that the test module must be rebuilt via maven.",
+                                            resourceNameRegex)));
+                case 1:
+                    return matchingResources.get(0);
+                default:
+                    throw new RuntimeException(
+                            new IOException(
+                                    String.format(
+                                            "Multiple resource files were found matching the pattern %s. Matches=%s",
+                                            resourceNameRegex, matchingResources)));
+            }
+        } catch (final IOException ioe) {
+            throw new RuntimeException("Could not search for resource resource files.", ioe);
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
new file mode 100644
index 0000000..c7d706f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch6-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch 6 : E2E tests</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>6.8.20</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>elasticsearch6-end-to-end-test</finalName>
+							<outputDirectory>dependencies</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${flink.version}</version>
+							<destFileName>flink-connector-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
new file mode 100644
index 0000000..b2c80fc
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot refresh index {}", indexName, e);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            LOG.error("Fetching records failed", e);
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
new file mode 100644
index 0000000..9659913
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch6. */
+public class UpdateRequest6Factory implements UpdateRequestFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String indexName;
+
+    /**
+     * Instantiates a new update request factory for of Elasticsearch6.
+     *
+     * @param indexName The index name.
+     */
+    public UpdateRequest6Factory(String indexName) {
+        this.indexName = indexName;
+    }
+
+    @Override
+    public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+        Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+        return new UpdateRequest(indexName, "doc", String.valueOf(element.key))
+                .doc(json)
+                .upsert(json);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
new file mode 100644
index 0000000..cc95275
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch6Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase
+        extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6SinkE2ECase.class);
+
+    public Elasticsearch6SinkE2ECase() throws Exception {}
+
+    String getElasticsearchContainerName() {
+        return DockerImageVersions.ELASTICSEARCH_6;
+    }
+
+    @TestContext
+    Elasticsearch6SinkExternalContextFactory contextFactory =
+            new Elasticsearch6SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("dependencies/elasticsearch6-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource(
+                                            "dependencies/flink-connector-elasticsearch-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
new file mode 100644
index 0000000..1ccbcd2
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch6SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+    /**
+     * Instantiates a new Elasticsearch 6 sink context base.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+     *     containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     */
+    Elasticsearch6SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        super(addressInternal, connectorJarPaths, new Elasticsearch6Client(addressExternal));
+    }
+
+    @Override
+    public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+        client.createIndexIfDoesNotExist(indexName, 1, 0);
+        return new Elasticsearch6SinkBuilder<KeyValue<Integer, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest6Factory(indexName)))
+                .setBulkFlushMaxActions(BULK_BUFFER)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public String toString() {
+        return "Elasticsearch 6 sink context.";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
new file mode 100644
index 0000000..690fbd5
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch6SinkExternalContextFactory
+        extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch6SinkExternalContext> {
+
+    /**
+     * Instantiates a new Elasticsearch 6 sink external context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    Elasticsearch6SinkExternalContextFactory(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        super(elasticsearchContainer, connectorJars);
+    }
+
+    @Override
+    public Elasticsearch6SinkExternalContext createExternalContext(String testName) {
+        return new Elasticsearch6SinkExternalContext(
+                elasticsearchContainer.getHttpHostAddress(),
+                formatInternalAddress(elasticsearchContainer),
+                connectorJars);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..4da8bba
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
new file mode 100644
index 0000000..9e020f1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch7-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch 7 : E2E Tests</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<elasticsearch.version>7.10.2</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch7</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>elasticsearch7-end-to-end-test</finalName>
+							<outputDirectory>dependencies</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${flink.version}</version>
+							<destFileName>flink-connector-test-utils.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
new file mode 100644
index 0000000..7faba25
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restClient.close();
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+        try {
+            SearchResponse response =
+                    restClient.search(
+                            new SearchRequest(params.indexName())
+                                    .source(
+                                            new SearchSourceBuilder()
+                                                    .sort(params.sortField(), SortOrder.ASC)
+                                                    .from(params.from())
+                                                    .size(params.pageLength())
+                                                    .trackTotalHits(params.trackTotalHits())),
+                            RequestOptions.DEFAULT);
+            SearchHit[] searchHits = response.getHits().getHits();
+            return Arrays.stream(searchHits)
+                    .map(
+                            searchHit ->
+                                    KeyValue.of(
+                                            Integer.valueOf(searchHit.getId()),
+                                            searchHit.getSourceAsMap().get("value").toString()))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            LOG.error("Fetching records failed", e);
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
new file mode 100644
index 0000000..875d09a
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch7. */
+public class UpdateRequest7Factory implements UpdateRequestFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String indexName;
+
+    /**
+     * Instantiates a new update request factory for of Elasticsearch7.
+     *
+     * @param indexName The index name.
+     */
+    public UpdateRequest7Factory(String indexName) {
+        this.indexName = indexName;
+    }
+
+    @Override
+    public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+        Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+        return new UpdateRequest(indexName, String.valueOf(element.key)).doc(json).upsert(json);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
new file mode 100644
index 0000000..59be31c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch7SinkE2ECase
+        extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SinkE2ECase.class);
+
+    public Elasticsearch7SinkE2ECase() throws Exception {}
+
+    String getElasticsearchContainerName() {
+        return DockerImageVersions.ELASTICSEARCH_7;
+    }
+
+    @TestContext
+    Elasticsearch7SinkExternalContextFactory contextFactory =
+            new Elasticsearch7SinkExternalContextFactory(
+                    elasticsearch.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("dependencies/elasticsearch7-end-to-end-test.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource(
+                                            "dependencies/flink-connector-elasticsearch-test-utils.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
new file mode 100644
index 0000000..aa31ea0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch7SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+    /**
+     * Instantiates a new Elasticsearch 7 sink context base.
+     *
+     * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+     *     the containerized environment).
+     * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+     *     containerized environment, should correspond to the network alias that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     */
+    Elasticsearch7SinkExternalContext(
+            String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+        super(addressInternal, connectorJarPaths, new Elasticsearch7Client(addressExternal));
+    }
+
+    @Override
+    public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+        client.createIndexIfDoesNotExist(indexName, 1, 0);
+        return new Elasticsearch7SinkBuilder<KeyValue<Integer, String>>()
+                .setHosts(HttpHost.create(this.addressInternal))
+                .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest7Factory(indexName)))
+                .setBulkFlushMaxActions(BULK_BUFFER)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+            TestingSinkSettings sinkSettings) {
+        return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+    }
+
+    @Override
+    public String toString() {
+        return "Elasticsearch 7 sink context.";
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
new file mode 100644
index 0000000..c0e3f41
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch7SinkExternalContextFactory
+        extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch7SinkExternalContext> {
+
+    /**
+     * Instantiates a new Elasticsearch 7 sink external context factory.
+     *
+     * @param elasticsearchContainer The Elasticsearch container.
+     * @param connectorJars The connector jars.
+     */
+    Elasticsearch7SinkExternalContextFactory(
+            ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+        super(elasticsearchContainer, connectorJars);
+    }
+
+    @Override
+    public Elasticsearch7SinkExternalContext createExternalContext(String testName) {
+        return new Elasticsearch7SinkExternalContext(
+                elasticsearchContainer.getHttpHostAddress(),
+                formatInternalAddress(elasticsearchContainer),
+                connectorJars);
+    }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e48d6c0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
+
diff --git a/flink-connector-elasticsearch-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/pom.xml
new file mode 100644
index 0000000..87756e1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connector-elasticsearch-parent</artifactId>
+		<version>3.0-SNAPSHOT</version>
+	</parent>
+
+	<packaging>pom</packaging>
+
+	<artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+	<name>Flink : Connectors : Elasticsearch : E2E Tests</name>
+
+	<modules>
+		<module>flink-connector-elasticsearch-e2e-tests-common</module>
+		<module>flink-connector-elasticsearch6-e2e-tests</module>
+		<module>flink-connector-elasticsearch7-e2e-tests</module>
+    </modules>
+
+	<profiles>
+		<profile>
+			<id>run-end-to-end-tests</id>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-surefire-plugin</artifactId>
+						<executions>
+							<execution>
+								<id>end-to-end-tests</id>
+								<phase>integration-test</phase>
+								<goals>
+									<goal>test</goal>
+								</goals>
+								<configuration>
+									<includes>
+										<include>**/*.*</include>
+									</includes>
+									<!-- E2E tests must not access flink-dist concurrently. -->
+									<forkCount>1</forkCount>
+									<systemPropertyVariables>
+										<moduleDir>${project.basedir}</moduleDir>
+									</systemPropertyVariables>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-deploy-plugin</artifactId>
+				<configuration>
+					<skip>true</skip>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>default-test</id>
+						<phase>none</phase>
+					</execution>
+					<execution>
+						<id>integration-tests</id>
+						<phase>none</phase>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-shade-plugin</artifactId>
+					<configuration>
+						<artifactSet>
+							<excludes combine.children="append">
+								<exclude>com.google.code.findbugs:jsr305</exclude>
+								<exclude>org.slf4j:slf4j-api</exclude>
+							</excludes>
+						</artifactSet>
+						<filters combine.children="append">
+							<filter>
+								<artifact>*:*</artifact>
+								<excludes>
+									<exclude>META-INF/*.SF</exclude>
+									<exclude>META-INF/*.DSA</exclude>
+									<exclude>META-INF/*.RSA</exclude>
+								</excludes>
+							</filter>
+						</filters>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+	
+</project>