You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/13 14:00:51 UTC
[flink-connector-elasticsearch] 04/06: [FLINK-28410][tests] Sync E2E tests
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
commit a9328dfcd7793b73cddc428d4c4a0aa827d220ca
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Sep 13 12:42:47 2022 +0200
[FLINK-28410][tests] Sync E2E tests
---
.github/workflows/ci.yml | 13 +-
.../pom.xml | 77 +++++++++
.../flink/streaming/tests/ElasticsearchClient.java | 59 +++++++
.../streaming/tests/ElasticsearchDataReader.java | 57 +++++++
.../tests/ElasticsearchSinkE2ECaseBase.java | 101 ++++++++++++
.../ElasticsearchSinkExternalContextBase.java | 123 +++++++++++++++
...lasticsearchSinkExternalContextFactoryBase.java | 59 +++++++
.../streaming/tests/ElasticsearchTestEmitter.java | 51 ++++++
.../org/apache/flink/streaming/tests/KeyValue.java | 92 +++++++++++
.../apache/flink/streaming/tests/QueryParams.java | 174 +++++++++++++++++++++
.../streaming/tests/UpdateRequestFactory.java | 43 +++++
.../flink/test/parameters/ParameterProperty.java | 58 +++++++
.../org/apache/flink/tests/util/TestUtils.java | 85 ++++++++++
.../pom.xml | 120 ++++++++++++++
.../streaming/tests/Elasticsearch6Client.java | 149 ++++++++++++++++++
.../streaming/tests/UpdateRequest6Factory.java | 48 ++++++
.../streaming/tests/Elasticsearch6SinkE2ECase.java | 60 +++++++
.../tests/Elasticsearch6SinkExternalContext.java | 68 ++++++++
.../Elasticsearch6SinkExternalContextFactory.java | 48 ++++++
.../src/test/resources/log4j2-test.properties | 34 ++++
.../pom.xml | 121 ++++++++++++++
.../streaming/tests/Elasticsearch7Client.java | 147 +++++++++++++++++
.../streaming/tests/UpdateRequest7Factory.java | 46 ++++++
.../streaming/tests/Elasticsearch7SinkE2ECase.java | 60 +++++++
.../tests/Elasticsearch7SinkExternalContext.java | 68 ++++++++
.../Elasticsearch7SinkExternalContextFactory.java | 48 ++++++
.../src/test/resources/log4j2-test.properties | 35 +++++
flink-connector-elasticsearch-e2e-tests/pom.xml | 128 +++++++++++++++
28 files changed, 2171 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c23d82c..9aff109 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,6 +26,7 @@ jobs:
jdk: [8, 11]
env:
MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+ FLINK_URL: https://s3.amazonaws.com/flink-nightly/flink-1.16-SNAPSHOT-bin-scala_2.12.tgz
steps:
- run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}"
@@ -45,4 +46,14 @@ jobs:
maven-version: 3.8.6
- name: Compile and test flink-connector-elasticsearch
- run: mvn clean install -Dscala-2.12 -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
+ run: |
+ pushd .. \
+ && wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz \
+ && popd
+
+ mvn clean install -U -B \
+ -Dscala-2.12 \
+ -Prun-end-to-end-tests -DdistDir=$(pwd)/../flink-1.16-SNAPSHOT \
+ -Dflink.convergence.phase=install -Pcheck-convergence \
+ ${{ env.MVN_CONNECTION_OPTIONS }} \
+ -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
new file mode 100644
index 0000000..b7b6e1d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+ <version>3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+ <name>Flink : Connectors : Elasticsearch : E2E tests : Common</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
new file mode 100644
index 0000000..ad97915
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import java.util.List;
+
+/** The version-agnostic Elasticsearch client interface. */
+public interface ElasticsearchClient {
+
+ /**
+ * Delete the index.
+ *
+ * @param indexName The index name.
+ */
+ void deleteIndex(String indexName);
+
+ /**
+ * Refresh the index.
+ *
+ * @param indexName The index name.
+ */
+ void refreshIndex(String indexName);
+
+ /**
+ * Create index if it does not exist.
+ *
+ * @param indexName The index name.
+ * @param shards The number of shards.
+ * @param replicas The number of replicas.
+ */
+ void createIndexIfDoesNotExist(String indexName, int shards, int replicas);
+
+ /** Close the client. @throws Exception The exception. */
+ void close() throws Exception;
+
+ /**
+ * Fetch all results from the index.
+ *
+ * @param params The parameters of the query.
+ * @return All documents from the index.
+ */
+ List<KeyValue<Integer, String>> fetchAll(QueryParams params);
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
new file mode 100644
index 0000000..165ca6d
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Elasticsearch data reader. */
+public class ElasticsearchDataReader
+ implements ExternalSystemDataReader<KeyValue<Integer, String>> {
+ private final ElasticsearchClient client;
+ private final String indexName;
+ private final int pageLength;
+
+ public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) {
+ this.client = checkNotNull(client);
+ this.indexName = checkNotNull(indexName);
+ this.pageLength = pageLength;
+ }
+
+ @Override
+ public List<KeyValue<Integer, String>> poll(Duration timeout) {
+ client.refreshIndex(indexName);
+ QueryParams params =
+ QueryParams.newBuilder(indexName)
+ .sortField("key")
+ .pageLength(pageLength)
+ .trackTotalHits(true)
+ .build();
+ return client.fetchAll(params);
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
new file mode 100644
index 0000000..b6f4a9f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */
+@SuppressWarnings("unused")
+public abstract class ElasticsearchSinkE2ECaseBase<T extends Comparable<T>>
+ extends SinkTestSuiteBase<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkE2ECaseBase.class);
+ private static final int READER_RETRY_ATTEMPTS = 10;
+ private static final int READER_TIMEOUT = -1; // Not used
+
+ protected static final String ELASTICSEARCH_HOSTNAME = "elasticsearch";
+
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+
+ // Defines TestEnvironment
+ @TestEnv
+ protected FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+ // Defines ConnectorExternalSystem
+ @TestExternalSystem
+ DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch =
+ DefaultContainerizedExternalSystem.builder()
+ .fromContainer(
+ new ElasticsearchContainer(
+ DockerImageName.parse(getElasticsearchContainerName()))
+ .withEnv(
+ "cluster.routing.allocation.disk.threshold_enabled",
+ "false")
+ .withNetworkAliases(ELASTICSEARCH_HOSTNAME))
+ .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+ .build();
+
+ @Override
+ protected void checkResultWithSemantic(
+ ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+ throws Exception {
+ waitUntilCondition(
+ () -> {
+ try {
+ List<T> result = reader.poll(Duration.ofMillis(READER_TIMEOUT));
+ assertThat(sort(result).iterator())
+ .matchesRecordsFromSource(
+ Collections.singletonList(sort(testData)), semantic);
+ return true;
+ } catch (Throwable t) {
+ LOG.warn("Polled results not as expected", t);
+ return false;
+ }
+ },
+ 5000,
+ READER_RETRY_ATTEMPTS);
+ }
+
+ private List<T> sort(List<T> list) {
+ return list.stream().sorted().collect(Collectors.toList());
+ }
+
+ abstract String getElasticsearchContainerName();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
new file mode 100644
index 0000000..598cf43
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context. */
+abstract class ElasticsearchSinkExternalContextBase
+ implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> {
+ /** The constant INDEX_NAME_PREFIX. */
+ protected static final String INDEX_NAME_PREFIX = "es-index";
+
+ private static final int RANDOM_STRING_MAX_LENGTH = 50;
+ private static final int NUM_RECORDS_UPPER_BOUND = 500;
+ private static final int NUM_RECORDS_LOWER_BOUND = 100;
+ protected static final int BULK_BUFFER = 100;
+ protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1;
+ /** The index name. */
+ protected final String indexName;
+
+ /** The address reachable from Flink (internal to the testing environment). */
+ protected final String addressInternal;
+
+ /** The connector jar paths. */
+ protected final List<URL> connectorJarPaths;
+
+ /** The client. */
+ protected final ElasticsearchClient client;
+
+ /**
+ * Instantiates a new Elasticsearch sink context base.
+ *
+ * @param addressInternal The address to access Elasticsearch from within Flink. When running in
+ * a containerized environment, should correspond to the network alias that resolves within
+ * the environment's network together with the exposed port.
+ * @param connectorJarPaths The connector jar paths.
+ * @param client The Elasticsearch client.
+ */
+ ElasticsearchSinkExternalContextBase(
+ String addressInternal, List<URL> connectorJarPaths, ElasticsearchClient client) {
+ this.addressInternal = checkNotNull(addressInternal);
+ this.connectorJarPaths = checkNotNull(connectorJarPaths);
+ this.client = checkNotNull(client);
+ this.indexName =
+ INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+ }
+
+ @Override
+ public List<KeyValue<Integer, String>> generateTestData(
+ TestingSinkSettings sinkSettings, long seed) {
+ Random random = new Random(seed);
+ int recordNum =
+ random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+ + NUM_RECORDS_LOWER_BOUND;
+
+ return IntStream.range(0, recordNum)
+ .boxed()
+ .map(
+ i -> {
+ int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+ String value = RandomStringUtils.random(valueLength, true, true);
+ return KeyValue.of(i, value);
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void close() {
+ client.deleteIndex(indexName);
+ }
+
+ @Override
+ public List<URL> getConnectorJarPaths() {
+ return connectorJarPaths;
+ }
+
+ @Override
+ public TypeInformation<KeyValue<Integer, String>> getProducedType() {
+ return TypeInformation.of(new TypeHint<KeyValue<Integer, String>>() {});
+ }
+
+ @Override
+ public abstract Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings);
+
+ @Override
+ public abstract ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+ TestingSinkSettings sinkSettings);
+
+ @Override
+ public abstract String toString();
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
new file mode 100644
index 0000000..0da9b2b
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.external.ExternalContext;
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context factory base. */
+public abstract class ElasticsearchSinkExternalContextFactoryBase<T extends ExternalContext>
+ implements ExternalContextFactory<T> {
+
+ /** The Elasticsearch container. */
+ protected final ElasticsearchContainer elasticsearchContainer;
+
+ /** The connector jars. */
+ protected final List<URL> connectorJars;
+
+ /**
+ * Instantiates a new Elasticsearch sink context factory.
+ *
+ * @param elasticsearchContainer The Elasticsearch container.
+ * @param connectorJars The connector jars.
+ */
+ ElasticsearchSinkExternalContextFactoryBase(
+ ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+ this.elasticsearchContainer = checkNotNull(elasticsearchContainer);
+ this.connectorJars = checkNotNull(connectorJars);
+ }
+
+ protected static String formatInternalAddress(
+ GenericContainer<ElasticsearchContainer> container) {
+ return String.format(
+ "%s:%d", container.getNetworkAliases().get(0), container.getExposedPorts().get(0));
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
new file mode 100644
index 0000000..7a5c36c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public class ElasticsearchTestEmitter implements ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final UpdateRequestFactory factory;
+
+ /**
+ * Instantiates a new Elasticsearch test emitter.
+ *
+ * @param factory The factory for creating {@link UpdateRequest}s.
+ */
+ public ElasticsearchTestEmitter(UpdateRequestFactory factory) {
+ this.factory = checkNotNull(factory);
+ }
+
+ @Override
+ public void emit(
+ KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
+ UpdateRequest updateRequest = factory.createUpdateRequest(element);
+ indexer.add(updateRequest);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
new file mode 100644
index 0000000..43db0c4
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A {@link Comparable} holder for key-value pairs. */
+public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
+ implements Comparable<KeyValue<K, V>>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The key of the key-value pair. */
+ public K key;
+ /** The value the key-value pair. */
+ public V value;
+
+ /** Creates a new key-value pair where all fields are null. */
+ public KeyValue() {}
+
+ private KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(KeyValue<K, V> other) {
+ int d = this.key.compareTo(other.key);
+ if (d == 0) {
+ return this.value.compareTo(other.value);
+ }
+ return d;
+ }
+
+ /** Creates a new key-value pair. */
+ public static <K extends Comparable<? super K>, T1 extends Comparable<? super T1>>
+ KeyValue<K, T1> of(K key, T1 value) {
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KeyValue)) {
+ return false;
+ }
+ @SuppressWarnings("rawtypes")
+ KeyValue keyValue = (KeyValue) o;
+ if (key != null ? !key.equals(keyValue.key) : keyValue.key != null) {
+ return false;
+ }
+ if (value != null ? !value.equals(keyValue.value) : keyValue.value != null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "("
+ + StringUtils.arrayAwareToString(this.key)
+ + ","
+ + StringUtils.arrayAwareToString(this.value)
+ + ")";
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
new file mode 100644
index 0000000..22cf7cf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Holder class for Elasticsearch query parameters. */
+public class QueryParams {
+ private final String indexName;
+ private final String sortField;
+ private final int from;
+ private final int pageLength;
+ private final boolean trackTotalHits;
+
+ private QueryParams(Builder builder) {
+ indexName = builder.indexName;
+ sortField = builder.sortField;
+ from = builder.from;
+ pageLength = builder.pageLength;
+ trackTotalHits = builder.trackTotalHits;
+ }
+
+ /**
+ * New {@code QueryParams} builder.
+ *
+ * @param indexName The index name. This parameter is mandatory.
+ * @return The builder.
+ */
+ public static Builder newBuilder(String indexName) {
+ return new Builder(indexName);
+ }
+
+ /** {@code QueryParams} builder static inner class. */
+ public static final class Builder {
+ private String sortField;
+ private int from;
+ private int pageLength;
+ private boolean trackTotalHits;
+ private String indexName;
+
+ private Builder(String indexName) {
+ this.indexName = checkNotNull(indexName);
+ }
+
+ /**
+ * Sets the {@code sortField} and returns a reference to this Builder enabling method
+ * chaining.
+ *
+ * @param sortField The {@code sortField} to set.
+ * @return A reference to this Builder.
+ */
+ public Builder sortField(String sortField) {
+ this.sortField = checkNotNull(sortField);
+ return this;
+ }
+
+ /**
+ * Sets the {@code from} and returns a reference to this Builder enabling method chaining.
+ *
+ * @param from The {@code from} to set.
+ * @return A reference to this Builder.
+ */
+ public Builder from(int from) {
+ this.from = from;
+ return this;
+ }
+
+ /**
+ * Sets the {@code pageLength} and returns a reference to this Builder enabling method
+ * chaining.
+ *
+ * @param pageLength The {@code pageLength} to set.
+ * @return A reference to this Builder.
+ */
+ public Builder pageLength(int pageLength) {
+ this.pageLength = pageLength;
+ return this;
+ }
+
+ /**
+ * Sets the {@code trackTotalHits} and returns a reference to this Builder enabling method
+ * chaining.
+ *
+ * @param trackTotalHits The {@code trackTotalHits} to set.
+ * @return A reference to this Builder.
+ */
+ public Builder trackTotalHits(boolean trackTotalHits) {
+ this.trackTotalHits = trackTotalHits;
+ return this;
+ }
+
+ /**
+ * Returns a {@code QueryParams} built from the parameters previously set.
+ *
+ * @return A {@code QueryParams} built with parameters of this {@code QueryParams.Builder}
+ */
+ public QueryParams build() {
+ return new QueryParams(this);
+ }
+
+ /**
+ * Sets the {@code indexName} and returns a reference to this Builder enabling method
+ * chaining.
+ *
+ * @param indexName The {@code indexName} to set.
+ * @return A reference to this Builder.
+ */
+ public Builder indexName(String indexName) {
+ this.indexName = checkNotNull(indexName);
+ return this;
+ }
+ }
+
+ /**
+ * Sort field string.
+ *
+ * @return The string.
+ */
+ public String sortField() {
+ return sortField;
+ }
+
+ /**
+ * From index to start the search from. Defaults to {@code 0}.
+ *
+ * @return The int.
+ */
+ public int from() {
+ return from;
+ }
+
+ /**
+ * Page length int.
+ *
+ * @return The int.
+ */
+ public int pageLength() {
+ return pageLength;
+ }
+
+ /**
+ * Track total hits boolean.
+ *
+ * @return The boolean.
+ */
+ public boolean trackTotalHits() {
+ return trackTotalHits;
+ }
+
+ /**
+ * Index name string.
+ *
+ * @return The string.
+ */
+ public String indexName() {
+ return indexName;
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
new file mode 100644
index 0000000..d7b8bb7
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Factory for creating UpdateRequests. */
+public interface UpdateRequestFactory extends Serializable {
+ UpdateRequest createUpdateRequest(KeyValue<Integer, String> element);
+
+ /**
+ * Utility to convert {@link KeyValue} elements into Elasticsearch-compatible format.
+ *
+ * @param element The element to be converted.
+ * @return The map with the element's fields.
+ */
+ static Map<String, Object> prepareDoc(KeyValue<Integer, String> element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("key", element.key);
+ json.put("value", element.value);
+ return json;
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
new file mode 100644
index 0000000..a2bcfdf
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.parameters;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** System-property based parameters for tests and resources. */
+public class ParameterProperty<V> {
+
+ private final String propertyName;
+ private final Function<String, V> converter;
+
+ public ParameterProperty(final String propertyName, final Function<String, V> converter) {
+ this.propertyName = propertyName;
+ this.converter = converter;
+ }
+
+ public String getPropertyName() {
+ return propertyName;
+ }
+
+ /**
+ * Retrieves the value of this property.
+ *
+ * @return Optional containing the value of this property
+ */
+ public Optional<V> get() {
+ final String value = System.getProperty(propertyName);
+ return value == null ? Optional.empty() : Optional.of(converter.apply(value));
+ }
+
+ /**
+ * Retrieves the value of this property, or the given default if no value was set.
+ *
+ * @return the value of this property, or the given default if no value was set
+ */
+ public V get(final V defaultValue) {
+ final String value = System.getProperty(propertyName);
+ return value == null ? defaultValue : converter.apply(value);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
new file mode 100644
index 0000000..980aaa9
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.test.parameters.ParameterProperty;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** General test utilities. */
+public enum TestUtils {
+ ;
+
+ private static final ParameterProperty<Path> MODULE_DIRECTORY =
+ new ParameterProperty<>("moduleDir", Paths::get);
+
+ /**
+ * Searches for a resource file matching the given regex in the given directory. This method is
+ * primarily intended to be used for the initialization of static {@link Path} fields for
+ * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
+ *
+ * @param resourceNameRegex regex pattern to match against
+ * @return Path pointing to the matching jar
+ * @throws RuntimeException if none or multiple resource files could be found
+ */
+ public static Path getResource(final String resourceNameRegex) {
+ // if the property is not set then we are most likely running in the IDE, where the working
+ // directory is the
+ // module of the test that is currently running, which is exactly what we want
+ Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
+
+ try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
+ final List<Path> matchingResources =
+ dependencyResources
+ .filter(
+ jar ->
+ Pattern.compile(resourceNameRegex)
+ .matcher(jar.toAbsolutePath().toString())
+ .find())
+ .collect(Collectors.toList());
+ switch (matchingResources.size()) {
+ case 0:
+ throw new RuntimeException(
+ new FileNotFoundException(
+ String.format(
+ "No resource file could be found that matches the pattern %s. "
+ + "This could mean that the test module must be rebuilt via maven.",
+ resourceNameRegex)));
+ case 1:
+ return matchingResources.get(0);
+ default:
+ throw new RuntimeException(
+ new IOException(
+ String.format(
+ "Multiple resource files were found matching the pattern %s. Matches=%s",
+ resourceNameRegex, matchingResources)));
+ }
+ } catch (final IOException ioe) {
+ throw new RuntimeException("Could not search for resource resource files.", ioe);
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
new file mode 100644
index 0000000..c7d706f
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+ <version>3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-elasticsearch6-e2e-tests</artifactId>
+ <name>Flink : Connectors : Elasticsearch 6 : E2E tests</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <elasticsearch.version>6.8.20</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch6</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>elasticsearch6-end-to-end-test</finalName>
+ <outputDirectory>dependencies</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <destFileName>flink-connector-test-utils.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
new file mode 100644
index 0000000..b2c80fc
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+ private final RestHighLevelClient restClient;
+
+ /**
+ * Instantiates a new Elasticsearch 6 client.
+ *
+ * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+ * the containerized environment).
+ */
+ public Elasticsearch6Client(String addressExternal) {
+ checkNotNull(addressExternal);
+ HttpHost httpHost = HttpHost.create(addressExternal);
+ RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+ this.restClient = new RestHighLevelClient(restClientBuilder);
+ checkNotNull(restClient);
+ }
+
+ @Override
+ public void deleteIndex(String indexName) {
+ DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+ try {
+ restClient.indices().delete(request, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ LOG.error("Cannot delete index {}", indexName, e);
+ }
+ // This is needed to avoid race conditions between tests that reuse the same index
+ refreshIndex(indexName);
+ }
+
+ @Override
+ public void refreshIndex(String indexName) {
+ RefreshRequest refresh = new RefreshRequest(indexName);
+ refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+ try {
+ restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ LOG.error("Cannot refresh index {}", indexName, e);
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ LOG.info("Index {} not found", indexName);
+ }
+ }
+ }
+
+ @Override
+ public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+ GetIndexRequest request = new GetIndexRequest(indexName);
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+ createIndexRequest.settings(
+ Settings.builder()
+ .put("index.number_of_shards", shards)
+ .put("index.number_of_replicas", replicas));
+ try {
+ boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+ if (!exists) {
+ restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ } else {
+ LOG.info("Index already exists {}", indexName);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot create index {}", indexName, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ restClient.close();
+ }
+
+ @Override
+ public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+ try {
+ SearchResponse response =
+ restClient.search(
+ new SearchRequest(params.indexName())
+ .source(
+ new SearchSourceBuilder()
+ .sort(params.sortField(), SortOrder.ASC)
+ .from(params.from())
+ .size(params.pageLength())
+ .trackTotalHits(params.trackTotalHits())),
+ RequestOptions.DEFAULT);
+ SearchHit[] searchHits = response.getHits().getHits();
+ return Arrays.stream(searchHits)
+ .map(
+ searchHit ->
+ KeyValue.of(
+ Integer.valueOf(searchHit.getId()),
+ searchHit.getSourceAsMap().get("value").toString()))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.error("Fetching records failed", e);
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
new file mode 100644
index 0000000..9659913
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch6. */
+public class UpdateRequest6Factory implements UpdateRequestFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String indexName;
+
+ /**
+ * Instantiates a new update request factory for of Elasticsearch6.
+ *
+ * @param indexName The index name.
+ */
+ public UpdateRequest6Factory(String indexName) {
+ this.indexName = indexName;
+ }
+
+ @Override
+ public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+ Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+ return new UpdateRequest(indexName, "doc", String.valueOf(element.key))
+ .doc(json)
+ .upsert(json);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
new file mode 100644
index 0000000..cc95275
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch6Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch6SinkE2ECase
+ extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6SinkE2ECase.class);
+
+ public Elasticsearch6SinkE2ECase() throws Exception {}
+
+ String getElasticsearchContainerName() {
+ return DockerImageVersions.ELASTICSEARCH_6;
+ }
+
+ @TestContext
+ Elasticsearch6SinkExternalContextFactory contextFactory =
+ new Elasticsearch6SinkExternalContextFactory(
+ elasticsearch.getContainer(),
+ Arrays.asList(
+ TestUtils.getResource("dependencies/elasticsearch6-end-to-end-test.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource(
+ "dependencies/flink-connector-elasticsearch-test-utils.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
new file mode 100644
index 0000000..1ccbcd2
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch6SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+ /**
+ * Instantiates a new Elasticsearch 6 sink context base.
+ *
+ * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+ * the containerized environment).
+ * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+ * containerized environment, should correspond to the network alias that resolves within
+ * the environment's network together with the exposed port.
+ * @param connectorJarPaths The connector jar paths.
+ */
+ Elasticsearch6SinkExternalContext(
+ String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+ super(addressInternal, connectorJarPaths, new Elasticsearch6Client(addressExternal));
+ }
+
+ @Override
+ public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+ client.createIndexIfDoesNotExist(indexName, 1, 0);
+ return new Elasticsearch6SinkBuilder<KeyValue<Integer, String>>()
+ .setHosts(HttpHost.create(this.addressInternal))
+ .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest6Factory(indexName)))
+ .setBulkFlushMaxActions(BULK_BUFFER)
+ .build();
+ }
+
+ @Override
+ public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+ TestingSinkSettings sinkSettings) {
+ return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+ }
+
+ @Override
+ public String toString() {
+ return "Elasticsearch 6 sink context.";
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
new file mode 100644
index 0000000..690fbd5
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch6SinkExternalContextFactory
+ extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch6SinkExternalContext> {
+
+ /**
+ * Instantiates a new Elasticsearch 6 sink external context factory.
+ *
+ * @param elasticsearchContainer The Elasticsearch container.
+ * @param connectorJars The connector jars.
+ */
+ Elasticsearch6SinkExternalContextFactory(
+ ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+ super(elasticsearchContainer, connectorJars);
+ }
+
+ @Override
+ public Elasticsearch6SinkExternalContext createExternalContext(String testName) {
+ return new Elasticsearch6SinkExternalContext(
+ elasticsearchContainer.getHttpHostAddress(),
+ formatInternalAddress(elasticsearchContainer),
+ connectorJars);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..4da8bba
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
new file mode 100644
index 0000000..9e020f1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+ <version>3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-elasticsearch7-e2e-tests</artifactId>
+ <name>Flink : Connectors : Elasticsearch 7 : E2E Tests</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <elasticsearch.version>7.10.2</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch7</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>elasticsearch7-end-to-end-test</finalName>
+ <outputDirectory>dependencies</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <destFileName>flink-connector-test-utils.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
new file mode 100644
index 0000000..7faba25
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+ private final RestHighLevelClient restClient;
+
+ /**
+ * Instantiates a new Elasticsearch 7 client.
+ *
+ * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+ * the containerized environment).
+ */
+ public Elasticsearch7Client(String addressExternal) {
+ checkNotNull(addressExternal);
+ HttpHost httpHost = HttpHost.create(addressExternal);
+ RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+ this.restClient = new RestHighLevelClient(restClientBuilder);
+ checkNotNull(restClient);
+ }
+
+ @Override
+ public void deleteIndex(String indexName) {
+ DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+ try {
+ restClient.indices().delete(request, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ LOG.error("Cannot delete index {}", indexName, e);
+ }
+ // This is needed to avoid race conditions between tests that reuse the same index
+ refreshIndex(indexName);
+ }
+
+ @Override
+ public void refreshIndex(String indexName) {
+ RefreshRequest refresh = new RefreshRequest(indexName);
+ try {
+ restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ LOG.error("Cannot delete index {}", indexName, e);
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ LOG.info("Index {} not found", indexName);
+ }
+ }
+ }
+
+ @Override
+ public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) {
+ GetIndexRequest request = new GetIndexRequest(indexName);
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+ createIndexRequest.settings(
+ Settings.builder()
+ .put("index.number_of_shards", shards)
+ .put("index.number_of_replicas", replicas));
+ try {
+ boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT);
+ if (!exists) {
+ restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ } else {
+ LOG.info("Index already exists {}", indexName);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot create index {}", indexName, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ restClient.close();
+ }
+
+ @Override
+ public List<KeyValue<Integer, String>> fetchAll(QueryParams params) {
+ try {
+ SearchResponse response =
+ restClient.search(
+ new SearchRequest(params.indexName())
+ .source(
+ new SearchSourceBuilder()
+ .sort(params.sortField(), SortOrder.ASC)
+ .from(params.from())
+ .size(params.pageLength())
+ .trackTotalHits(params.trackTotalHits())),
+ RequestOptions.DEFAULT);
+ SearchHit[] searchHits = response.getHits().getHits();
+ return Arrays.stream(searchHits)
+ .map(
+ searchHit ->
+ KeyValue.of(
+ Integer.valueOf(searchHit.getId()),
+ searchHit.getSourceAsMap().get("value").toString()))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.error("Fetching records failed", e);
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
new file mode 100644
index 0000000..875d09a
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.Map;
+
+/** Factory for creating UpdateRequests of Elasticsearch7. */
+public class UpdateRequest7Factory implements UpdateRequestFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String indexName;
+
+ /**
+ * Instantiates a new update request factory for of Elasticsearch7.
+ *
+ * @param indexName The index name.
+ */
+ public UpdateRequest7Factory(String indexName) {
+ this.indexName = indexName;
+ }
+
+ @Override
+ public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) {
+ Map<String, Object> json = UpdateRequestFactory.prepareDoc(element);
+ return new UpdateRequest(indexName, String.valueOf(element.key)).doc(json).upsert(json);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
new file mode 100644
index 0000000..59be31c
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/** End to end test for Elasticsearch7Sink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class Elasticsearch7SinkE2ECase
+ extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SinkE2ECase.class);
+
+ public Elasticsearch7SinkE2ECase() throws Exception {}
+
+ String getElasticsearchContainerName() {
+ return DockerImageVersions.ELASTICSEARCH_7;
+ }
+
+ @TestContext
+ Elasticsearch7SinkExternalContextFactory contextFactory =
+ new Elasticsearch7SinkExternalContextFactory(
+ elasticsearch.getContainer(),
+ Arrays.asList(
+ TestUtils.getResource("dependencies/elasticsearch7-end-to-end-test.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource(
+ "dependencies/flink-connector-elasticsearch-test-utils.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL()));
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
new file mode 100644
index 0000000..aa31ea0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.http.HttpHost;
+
+import java.net.URL;
+import java.util.List;
+
+class Elasticsearch7SinkExternalContext extends ElasticsearchSinkExternalContextBase {
+
+ /**
+ * Instantiates a new Elasticsearch 7 sink context base.
+ *
+ * @param addressExternal The address to access Elasticsearch from the host machine (outside of
+ * the containerized environment).
+ * @param addressInternal The address to access Elasticsearch from Flink. When running in a
+ * containerized environment, should correspond to the network alias that resolves within
+ * the environment's network together with the exposed port.
+ * @param connectorJarPaths The connector jar paths.
+ */
+ Elasticsearch7SinkExternalContext(
+ String addressExternal, String addressInternal, List<URL> connectorJarPaths) {
+ super(addressInternal, connectorJarPaths, new Elasticsearch7Client(addressExternal));
+ }
+
+ @Override
+ public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) {
+ client.createIndexIfDoesNotExist(indexName, 1, 0);
+ return new Elasticsearch7SinkBuilder<KeyValue<Integer, String>>()
+ .setHosts(HttpHost.create(this.addressInternal))
+ .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest7Factory(indexName)))
+ .setBulkFlushMaxActions(BULK_BUFFER)
+ .build();
+ }
+
+ @Override
+ public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader(
+ TestingSinkSettings sinkSettings) {
+ return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH);
+ }
+
+ @Override
+ public String toString() {
+ return "Elasticsearch 7 sink context.";
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
new file mode 100644
index 0000000..c0e3f41
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.net.URL;
+import java.util.List;
+
+/** Elasticsearch sink external context factory. */
+class Elasticsearch7SinkExternalContextFactory
+ extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch7SinkExternalContext> {
+
+ /**
+ * Instantiates a new Elasticsearch 7 sink external context factory.
+ *
+ * @param elasticsearchContainer The Elasticsearch container.
+ * @param connectorJars The connector jars.
+ */
+ Elasticsearch7SinkExternalContextFactory(
+ ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) {
+ super(elasticsearchContainer, connectorJars);
+ }
+
+ @Override
+ public Elasticsearch7SinkExternalContext createExternalContext(String testName) {
+ return new Elasticsearch7SinkExternalContext(
+ elasticsearchContainer.getHttpHostAddress(),
+ formatInternalAddress(elasticsearchContainer),
+ connectorJars);
+ }
+}
diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e48d6c0
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=DOCKER> %m%n
+# It is recommended to uncomment these lines when enabling the logger. The below package used
+# by testcontainers is quite verbose
+logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core
+logger.yarn.level=WARN
+logger.yarn.appenderRef.console.ref=TestLogger
+logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils
+logger.testutils.level=WARN
+logger.testutils.appenderRef.console.ref=TestLogger
+
diff --git a/flink-connector-elasticsearch-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/pom.xml
new file mode 100644
index 0000000..87756e1
--- /dev/null
+++ b/flink-connector-elasticsearch-e2e-tests/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-parent</artifactId>
+ <version>3.0-SNAPSHOT</version>
+ </parent>
+
+ <packaging>pom</packaging>
+
+ <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId>
+ <name>Flink : Connectors : Elasticsearch : E2E Tests</name>
+
+ <modules>
+ <module>flink-connector-elasticsearch-e2e-tests-common</module>
+ <module>flink-connector-elasticsearch6-e2e-tests</module>
+ <module>flink-connector-elasticsearch7-e2e-tests</module>
+ </modules>
+
+ <profiles>
+ <profile>
+ <id>run-end-to-end-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>end-to-end-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/*.*</include>
+ </includes>
+ <!-- E2E tests must not access flink-dist concurrently. -->
+ <forkCount>1</forkCount>
+ <systemPropertyVariables>
+ <moduleDir>${project.basedir}</moduleDir>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <artifactSet>
+ <excludes combine.children="append">
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:slf4j-api</exclude>
+ </excludes>
+ </artifactSet>
+ <filters combine.children="append">
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>