You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/01 13:19:41 UTC

[flink] branch release-1.6 updated (dcfd0d3 -> 5b32a8b)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dcfd0d3  [FLINK-9946][tests] Expose Flink version to E2E tests
     new 788b973  [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis
     new 2080576  [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.
     new cea6c9a  [FLINK-7386] [elasticsearch] Evolve ES connector API to make it working with Elasticsearch 5.3+
     new cae5f6e  [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support
     new 157db4a  [FLINK-9885] [tests] Add Elasticsearch 6.x end-to-end test
     new 5b32a8b  [FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/connectors/elasticsearch.md               | 195 ++++++++++++++++++-
 .../elasticsearch/BulkProcessorIndexer.java        |  29 ++-
 .../elasticsearch/ElasticsearchApiCallBridge.java  |  23 ++-
 .../elasticsearch/ElasticsearchSinkBase.java       |  23 ++-
 .../connectors/elasticsearch/RequestIndexer.java   |  42 +++-
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |  39 ++--
 .../elasticsearch/ElasticsearchSinkTestBase.java   | 103 ++++++----
 .../EmbeddedElasticsearchNodeEnvironment.java      |   2 +-
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |   7 +-
 .../elasticsearch/ElasticsearchSink.java           |   3 +-
 .../elasticsearch/ElasticsearchSinkITCase.java     |  59 ++++--
 .../Elasticsearch2ApiCallBridge.java               |  16 +-
 .../elasticsearch2/ElasticsearchSink.java          |   2 +-
 .../elasticsearch2/ElasticsearchSinkITCase.java    |  60 ++++--
 .../Elasticsearch5ApiCallBridge.java               |  16 +-
 .../elasticsearch5/ElasticsearchSink.java          |   2 +-
 .../elasticsearch5/ElasticsearchSinkITCase.java    |  65 +++++--
 .../flink-connector-elasticsearch6/pom.xml         | 180 ++++++++++++++++++
 .../Elasticsearch6ApiCallBridge.java}              |  71 +++----
 .../elasticsearch6/ElasticsearchSink.java          | 211 +++++++++++++++++++++
 .../elasticsearch6/RestClientFactory.java          |  18 +-
 .../EmbeddedElasticsearchNodeEnvironmentImpl.java  |  15 +-
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 100 ++++++++++
 .../src/test/resources/log4j-test.properties       |   5 +-
 .../kinesis/internals/KinesisDataFetcher.java      |  46 ++++-
 .../kinesis/internals/ShardConsumer.java           | 206 ++++++++++----------
 .../streaming/connectors/kinesis/util/AWSUtil.java |   4 +-
 .../connectors/kinesis/util/TimeoutLatch.java      |   3 +
 .../testutils/TestableKinesisDataFetcher.java      |   2 +-
 flink-connectors/pom.xml                           |   1 +
 .../pom.xml                                        |  29 ++-
 .../tests/Elasticsearch6SinkExample.java}          |  33 ++--
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   1 +
 .../test-scripts/elasticsearch-common.sh           |  25 ++-
 .../test-scripts/test_streaming_elasticsearch.sh   |   3 -
 tools/travis_mvn_watchdog.sh                       |   1 +
 37 files changed, 1294 insertions(+), 347 deletions(-)
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/pom.xml
 copy flink-connectors/{flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java => flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java} (59%)
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentFactory.java => flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java (62%)
 copy flink-connectors/{flink-connector-elasticsearch5 => flink-connector-elasticsearch6}/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java (81%)
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 copy {flink-formats/flink-parquet => flink-connectors/flink-connector-elasticsearch6}/src/test/resources/log4j-test.properties (93%)
 copy flink-end-to-end-tests/{flink-elasticsearch2-test => flink-elasticsearch6-test}/pom.xml (75%)
 copy flink-end-to-end-tests/{flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java => flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java} (71%)


[flink] 04/06: [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cae5f6ee3f85a0d331084b1fd5586263dbab2207
Author: Christophe Jolif <cj...@gmail.com>
AuthorDate: Thu Jan 25 22:31:57 2018 +0100

    [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support
---
 docs/dev/connectors/elasticsearch.md               |  55 ++++++-
 .../elasticsearch/ElasticsearchApiCallBridge.java  |   3 +-
 .../elasticsearch/ElasticsearchSinkBase.java       |   3 +-
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |  10 +-
 .../EmbeddedElasticsearchNodeEnvironment.java      |   2 +-
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |   2 +-
 .../flink-connector-elasticsearch6/pom.xml         | 180 +++++++++++++++++++++
 .../Elasticsearch6ApiCallBridge.java               | 110 +++++++++++++
 .../elasticsearch6/ElasticsearchSink.java          |  91 +++++++++++
 .../EmbeddedElasticsearchNodeEnvironmentImpl.java  |  82 ++++++++++
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 152 +++++++++++++++++
 .../examples/ElasticsearchSinkExample.java         |  81 ++++++++++
 .../src/test/resources/log4j-test.properties       |  27 ++++
 flink-connectors/pom.xml                           |   1 +
 14 files changed, 786 insertions(+), 13 deletions(-)

diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 52d1b58..20a7d71 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -55,6 +55,11 @@ of the Elasticsearch installation:
         <td>1.3.0</td>
         <td>5.x</td>
     </tr>
+    <tr>
+        <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td>
+        <td>1.6.0</td>
+        <td>6 and later versions</td>
+    </tr>
   </tbody>
 </table>
 
@@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document actions against your clu
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an
 Elasticsearch cluster.
 
 The example below shows how to configure and create a sink:
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
     }
 }));{% endhighlight %}
 </div>
+<div data-lang="java, Elasticsearch 6.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+List<HttpHost> httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
+}));{% endhighlight %}
+</div>
 <div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
 val input: DataStream[String] = ...
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 </div>
+<div data-lang="scala, Elasticsearch 6.x" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
+    json.put("data", element)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json)
+  }
+}))
+{% endhighlight %}
+</div>
 </div>
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
 The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 1c501bf..90d84f3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -41,8 +41,7 @@ import java.util.Map;
 public abstract class ElasticsearchApiCallBridge implements Serializable {
 
 	/**
-	 * Creates an Elasticsearch client implementing {@link AutoCloseable}. This can
-	 * be a {@link org.elasticsearch.client.Client} or {@link org.elasticsearch.client.RestHighLevelClient}
+	 * Creates an Elasticsearch client implementing {@link AutoCloseable}.
 	 *
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 0305ee3d..9830484 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -59,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
  * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
  *
  * @param <T> Type of the elements handled by this sink
  */
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 5a161a7..460e939 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -540,6 +540,11 @@ public class ElasticsearchSinkBaseTest {
 			return mock(Client.class);
 		}
 
+		@Override
+		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+			return null;
+		}
+
 		@Nullable
 		@Override
 		public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
@@ -551,11 +556,6 @@ public class ElasticsearchSinkBaseTest {
 		}
 
 		@Override
-		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-			return null;
-		}
-
-		@Override
 		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
 			// no need for this in the test cases here
 		}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index ea6e7a3..fd14ba3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -29,7 +29,7 @@ import java.io.File;
  *       also be located under the same package. The intentional package-private accessibility of this interface
  *       enforces that.
  */
-interface EmbeddedElasticsearchNodeEnvironment {
+public interface EmbeddedElasticsearchNodeEnvironment {
 
 	/**
 	 * Start an embedded Elasticsearch node instance.
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 6f49206..28d5f34 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 				.data(false)
 				.node();
 
-			Client client = node.client();
+			AutoCloseable client = node.client();
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Created Elasticsearch client from embedded node");
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
new file mode 100644
index 0000000..e453837
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.7-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+	<name>flink-connector-elasticsearch6</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>6.3.1</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Dependency for Elasticsearch 6.x REST Client -->
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>elasticsearch-rest-high-level-client</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
+
+		<!--
+			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+			in the logging implementation preferred.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-to-slf4j</artifactId>
+			<version>2.7</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<!--
+   			 Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
+		-->
+
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>${elasticsearch.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch.plugin</groupId>
+			<artifactId>transport-netty4-client</artifactId>
+			<version>${elasticsearch.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!--
+			Including Log4j2 dependencies for tests is required for the
+			embedded Elasticsearch nodes used in tests to run correctly.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!--
+				For the tests, we need to exclude the Log4j2 to slf4j adapter dependency
+				and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node
+				used in tests will fail to work.
+
+				In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J,
+				but for the test builds, we still stick to directly using Log4j2.
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.12.2</version>
+				<configuration>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+					</classpathDependencyExcludes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 0000000..2cb4ea0
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = -5222683870097809633L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+	/**
+	 * User-provided HTTP Host.
+	 */
+	private final List<HttpHost> httpHosts;
+
+	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+		Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+		this.httpHosts = httpHosts;
+	}
+
+	@Override
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
+		RestHighLevelClient rhlClient =
+			new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
+		}
+
+		return rhlClient;
+	}
+
+	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		RestHighLevelClient rhlClient = (RestHighLevelClient) client;
+		return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return bulkItemResponse.getFailure().getCause();
+		}
+	}
+
+	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 0000000..3f75b5f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
+ *
+ * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 *
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 *
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch6ApiCallBridge(httpHosts),  userConfig, elasticsearchSinkFunction, failureHandler);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..f419b41
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			Settings settings = Settings.builder()
+				.put("cluster.name", clusterName)
+				.put("http.enabled", false)
+				.put("path.home", tmpDataFolder.getParent())
+				.put("path.data", tmpDataFolder.getAbsolutePath())
+				.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+				.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
+				.build();
+
+			node = new PluginNode(settings);
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+
+	private static class PluginNode extends Node {
+		public PluginNode(Settings settings) {
+			super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class));
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..2170771
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+	/**
+	 * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
+	 */
+	public void runTransportClientTest() throws Exception {
+		final String index = "transport-client-test-index";
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+		env.execute("Elasticsearch RestHighLevelClient Test");
+
+		// verify the results
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+		client.close();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 */
+	public void runNullTransportClientTest() throws Exception {
+		try {
+			Map<String, String> userConfig = new HashMap<>();
+			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+			createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 */
+	public void runEmptyTransportClientTest() throws Exception {
+		try {
+			Map<String, String> userConfig = new HashMap<>();
+			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+			createElasticsearchSink6(userConfig,
+				Collections.<HttpHost>emptyList(),
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
+	 */
+	public void runTransportClientFailsTest() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+		try {
+			env.execute("Elasticsearch Transport Client Test");
+		} catch (JobExecutionException expectedException) {
+			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			return;
+		}
+
+		fail();
+	}
+
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return null;
+	}
+
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+		ArrayList<HttpHost> httpHosts = new ArrayList<>();
+		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+	}
+
+	private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
+		Map<String, String> userConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..de1670f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<HttpHost> httpHosts = new ArrayList<>();
+		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 782d2be..3ff1398 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-elasticsearch2</module>
 		<module>flink-connector-elasticsearch5</module>
+		<module>flink-connector-elasticsearch6</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>


[flink] 01/06: [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 788b973d1265928f00055ce10fd03a0247f476d0
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
AuthorDate: Tue Jul 24 11:44:08 2018 -0700

    [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis
    
    This closes #6408.
---
 .../kinesis/internals/ShardConsumer.java           | 85 +++++++++++++---------
 1 file changed, 50 insertions(+), 35 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 77d180c..b14c6a4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -207,7 +207,7 @@ public class ShardConsumer<T> implements Runnable {
 				}
 			}
 
-			long lastTimeNanos = 0;
+			long processingStartTimeNanos = System.nanoTime();
 			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -215,14 +215,6 @@ public class ShardConsumer<T> implements Runnable {
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
-					if (fetchIntervalMillis != 0) {
-						long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
-						long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
-						if (sleepTimeMillis > 0) {
-							Thread.sleep(sleepTimeMillis);
-						}
-						lastTimeNanos = System.nanoTime();
-					}
 
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
@@ -233,19 +225,17 @@ public class ShardConsumer<T> implements Runnable {
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
 					long recordBatchSizeBytes = 0L;
-					long averageRecordSizeBytes = 0L;
-
 					for (UserRecord record : fetchedRecords) {
 						recordBatchSizeBytes += record.getData().remaining();
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
-					if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
-						averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
-						maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-					}
-
 					nextShardItr = getRecordsResult.getNextShardIterator();
+
+					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
+					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
+					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
+					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
 		} catch (Throwable t) {
@@ -254,6 +244,50 @@ public class ShardConsumer<T> implements Runnable {
 	}
 
 	/**
+	 * Adjusts loop timing to match target frequency if specified.
+	 * @param processingStartTimeNanos The start time of the run loop "work"
+	 * @param processingEndTimeNanos The end time of the run loop "work"
+	 * @return The System.nanoTime() after the sleep (if any)
+	 * @throws InterruptedException
+	 */
+	protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
+		throws InterruptedException {
+		long endTimeNanos = processingEndTimeNanos;
+		if (fetchIntervalMillis != 0) {
+			long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
+			long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
+			if (sleepTimeMillis > 0) {
+				Thread.sleep(sleepTimeMillis);
+				endTimeNanos = System.nanoTime();
+			}
+		}
+		return endTimeNanos;
+	}
+
+	/**
+	 * Calculates how many records to read each time through the loop based on a target throughput
+	 * and the measured frequenecy of the loop.
+	 * @param runLoopTimeNanos The total time of one pass through the loop
+	 * @param numRecords The number of records of the last read operation
+	 * @param recordBatchSizeBytes The total batch size of the last read operation
+	 * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
+	 */
+	private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
+			int maxNumberOfRecordsPerFetch) {
+		if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) {
+			long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
+			// Adjust number of records to fetch from the shard depending on current average record size
+			// to optimize 2 Mb / sec read limits
+			double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
+			double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
+			// Ensure the value is not more than 10000L
+			maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+		}
+		return maxNumberOfRecordsPerFetch;
+	}
+
+	/**
 	 * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
 	 * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
 	 * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
@@ -347,23 +381,4 @@ public class ShardConsumer<T> implements Runnable {
 	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
 		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
 	}
-
-	/**
-	 * Adapts the maxNumberOfRecordsPerFetch based on the current average record size
-	 * to optimize 2 Mb / sec read limits.
-	 *
-	 * @param averageRecordSizeBytes
-	 * @return adaptedMaxRecordsPerFetch
-	 */
-
-	protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
-		int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
-		if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
-				adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis));
-
-				// Ensure the value is not more than 10000L
-				adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
-			}
-		return adaptedMaxRecordsPerFetch;
-	}
 }


[flink] 06/06: [FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b32a8beeba84f14a1e419d24543446264e0dff8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 17:21:34 2018 +0800

    [FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector
    
    This closes #6391.
---
 docs/dev/connectors/elasticsearch.md               | 192 ++++++++++++++++++---
 .../elasticsearch/ElasticsearchApiCallBridge.java  |  22 ++-
 .../elasticsearch/ElasticsearchSinkBase.java       |  24 ++-
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |   8 +-
 .../elasticsearch/ElasticsearchSinkTestBase.java   | 103 +++++++----
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |  10 +-
 .../elasticsearch/ElasticsearchSink.java           |   3 +-
 .../elasticsearch/ElasticsearchSinkITCase.java     |  59 +++++--
 .../Elasticsearch2ApiCallBridge.java               |   9 +-
 .../elasticsearch2/ElasticsearchSink.java          |   2 +-
 .../elasticsearch2/ElasticsearchSinkITCase.java    |  60 +++++--
 .../Elasticsearch5ApiCallBridge.java               |   9 +-
 .../elasticsearch5/ElasticsearchSink.java          |   2 +-
 .../elasticsearch5/ElasticsearchSinkITCase.java    |  65 +++++--
 .../flink-connector-elasticsearch6/pom.xml         |   6 +-
 .../Elasticsearch6ApiCallBridge.java               |  35 +++-
 .../elasticsearch6/ElasticsearchSink.java          | 168 +++++++++++++++---
 .../elasticsearch6/RestClientFactory.java          |  40 +++++
 .../EmbeddedElasticsearchNodeEnvironmentImpl.java  |   5 +-
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 144 +++++-----------
 .../examples/ElasticsearchSinkExample.java         |  81 ---------
 .../src/test/resources/log4j-test.properties       |   3 -
 .../streaming/tests/Elasticsearch6SinkExample.java |  19 +-
 .../test-scripts/elasticsearch-common.sh           |  25 ++-
 .../test-scripts/test_streaming_elasticsearch.sh   |   3 -
 tools/travis_mvn_watchdog.sh                       |   1 +
 26 files changed, 703 insertions(+), 395 deletions(-)

diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 20a7d71..bafe391 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -84,6 +84,23 @@ The example below shows how to configure and create a sink:
 <div class="codetabs" markdown="1">
 <div data-lang="java, Elasticsearch 1.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -115,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
 </div>
 <div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -145,31 +178,83 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
 </div>
 <div data-lang="java, Elasticsearch 6.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 List<HttpHost> httpHost = new ArrayList<>();
 httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
 
-input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
-    public IndexRequest createIndexRequest(String element) {
-        Map<String, String> json = new HashMap<>();
-        json.put("data", element);
-    
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-    
-    @Override
-    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-        indexer.add(createIndexRequest(element));
+// use a ElasticsearchSink.Builder to create an ElasticsearchSink
+ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+    httpHosts,
+    new ElasticsearchSinkFunction<String>() {
+        public IndexRequest createIndexRequest(String element) {
+            Map<String, String> json = new HashMap<>();
+            json.put("data", element);
+        
+            return Requests.indexRequest()
+                    .index("my-index")
+                    .type("my-type")
+                    .source(json);
+        }
+        
+        @Override
+        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+            indexer.add(createIndexRequest(element));
+        }
     }
-}));{% endhighlight %}
+);
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1);
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+);
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build());
+{% endhighlight %}
 </div>
 <div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.common.transport.TransportAddress
+
+import java.net.InetAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -196,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 </div>
 <div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -222,33 +323,72 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 </div>
 <div data-lang="scala, Elasticsearch 6.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
+
+import org.apache.http.HttpHost
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.util.ArrayList
+import java.util.List
+
 val input: DataStream[String] = ...
 
 val httpHosts = new java.util.ArrayList[HttpHost]
 httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
 httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
 
-input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new java.util.HashMap[String, String]
-    json.put("data", element)
-    
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json)
+val esSinkBuilder = new ElasticsearchSink.Builer[String](
+  httpHosts,
+  new ElasticsearchSinkFunction[String] {
+    def createIndexRequest(element: String): IndexRequest = {
+      val json = new java.util.HashMap[String, String]
+      json.put("data", element)
+      
+      return Requests.indexRequest()
+              .index("my-index")
+              .type("my-type")
+              .source(json)
+    }
   }
-}))
+)
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1)
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+)
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build)
 {% endhighlight %}
 </div>
 </div>
 
-Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate
+with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s
+is used to configure the `ElasticsearchSink`. This config map will be directly
+forwarded when creating the internally used `TransportClient`.
 The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
 the name of your cluster.
 
+For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication.
+By default, the connector uses the default configurations for the REST client. To have custom
+configuration for the REST client, users can provide a `RestClientFactory` implementation when 
+setting up the `ElasticsearchClient.Builder` that builds the sink.
+
 Also note that the example only demonstrates performing a single index
 request for each incoming element. Generally, the `ElasticsearchSinkFunction`
 can be used to perform multiple requests of different types (ex.,
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 90d84f3..f1dcc83 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkProcessor;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
@@ -36,9 +37,11 @@ import java.util.Map;
  * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
  * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
  * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
  */
 @Internal
-public abstract class ElasticsearchApiCallBridge implements Serializable {
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
 
 	/**
 	 * Creates an Elasticsearch client implementing {@link AutoCloseable}.
@@ -46,9 +49,16 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
 	 */
-	public abstract AutoCloseable createClient(Map<String, String> clientConfig);
+	C createClient(Map<String, String> clientConfig) throws IOException;
 
-	public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
+	/**
+	 * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+	 *
+	 * @param client the Elasticsearch client.
+	 * @param listener the bulk processor listender.
+	 * @return the bulk processor builder.
+	 */
+	BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
 
 	/**
 	 * Extracts the cause of failure of a bulk item action.
@@ -56,7 +66,7 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param bulkItemResponse the bulk item response to extract cause of failure
 	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
 	 */
-	public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
 	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
@@ -65,14 +75,14 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param builder the {@link BulkProcessor.Builder} to configure.
 	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
 	 */
-	public abstract void configureBulkProcessorBackoff(
+	void configureBulkProcessorBackoff(
 		BulkProcessor.Builder builder,
 		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
 
 	/**
 	 * Perform any necessary state cleanup.
 	 */
-	public void cleanup() {
+	default void cleanup() {
 		// nothing to cleanup by default
 	}
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9830484..7dac06c 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +33,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -58,12 +60,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
  * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
  *
  * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable}
  */
 @Internal
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = -1007596293618451942L;
 
@@ -84,6 +87,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	/**
 	 * Used to control whether the retry delay should increase exponentially or remain constant.
 	 */
+	@PublicEvolving
 	public enum FlushBackoffType {
 		CONSTANT,
 		EXPONENTIAL
@@ -134,14 +138,20 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 
 	private final Integer bulkProcessorFlushMaxActions;
 	private final Integer bulkProcessorFlushMaxSizeMb;
-	private final Integer bulkProcessorFlushIntervalMillis;
+	private final Long bulkProcessorFlushIntervalMillis;
 	private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
 
 	// ------------------------------------------------------------------------
 	//  User-facing API and configuration
 	// ------------------------------------------------------------------------
 
-	/** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */
+	/**
+	 * The config map that contains configuration for the bulk flushing behaviours.
+	 *
+	 * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config
+	 * map would also contain Elasticsearch-shipped configuration, and therefore this config map
+	 * would also be forwarded when creating the Elasticsearch client.
+	 */
 	private final Map<String, String> userConfig;
 
 	/** The function that is used to construct multiple {@link ActionRequest ActionRequests} from each incoming element. */
@@ -161,7 +171,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	// ------------------------------------------------------------------------
 
 	/** Call bridge for different version-specific. */
-	private final ElasticsearchApiCallBridge callBridge;
+	private final ElasticsearchApiCallBridge<C> callBridge;
 
 	/**
 	 * Number of pending action requests not yet acknowledged by Elasticsearch.
@@ -175,7 +185,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	private AtomicLong numPendingRequests = new AtomicLong(0);
 
 	/** Elasticsearch client created using the call bridge. */
-	private transient AutoCloseable client;
+	private transient C client;
 
 	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
 	private transient BulkProcessor bulkProcessor;
@@ -236,7 +246,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 		}
 
 		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+			bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
 			userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
 		} else {
 			bulkProcessorFlushIntervalMillis = null;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 460e939..369d26a 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -411,7 +411,7 @@ public class ElasticsearchSinkBaseTest {
 		testHarness.close();
 	}
 
-	private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+	private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
 
 		private static final long serialVersionUID = 5051907841570096991L;
 
@@ -531,17 +531,17 @@ public class ElasticsearchSinkBaseTest {
 		}
 	}
 
-	private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge {
+	private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge<Client> {
 
 		private static final long serialVersionUID = -4272760730959041699L;
 
 		@Override
-		public AutoCloseable createClient(Map<String, String> clientConfig) {
+		public Client createClient(Map<String, String> clientConfig) {
 			return mock(Client.class);
 		}
 
 		@Override
-		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
 			return null;
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index df3779b..819ffba 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -34,19 +33,20 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
+ *
+ * @param <C> Elasticsearch client type
+ * @param <A> The address type to use
  */
-public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
 
@@ -85,24 +85,21 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
+	 * Tests that the Elasticsearch sink works properly.
 	 */
-	public void runTransportClientTest() throws Exception {
-		final String index = "transport-client-test-index";
+	public void runElasticsearchSinkTest() throws Exception {
+		final String index = "elasticsearch-sink-test-index";
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", CLUSTER_NAME);
-
 		source.addSink(createElasticsearchSinkForEmbeddedNode(
-			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+				1,
+				CLUSTER_NAME,
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
 
-		env.execute("Elasticsearch TransportClient Test");
+		env.execute("Elasticsearch Sink Test");
 
 		// verify the results
 		Client client = embeddedNodeEnv.getClient();
@@ -112,16 +109,20 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is {@code null}.
 	 */
-	public void runNullTransportClientTest() throws Exception {
+	public void runNullAddressesTest() throws Exception {
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", CLUSTER_NAME);
 
 		try {
-			createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
+			createElasticsearchSink(
+					1,
+					CLUSTER_NAME,
+					null,
+					new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException | NullPointerException expectedException) {
 			// test passes
 			return;
 		}
@@ -130,18 +131,19 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is empty.
 	 */
-	public void runEmptyTransportClientTest() throws Exception {
+	public void runEmptyAddressesTest() throws Exception {
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", CLUSTER_NAME);
 
 		try {
 			createElasticsearchSink(
-				userConfig,
-				Collections.<InetSocketAddress>emptyList(),
-				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+					1,
+					CLUSTER_NAME,
+					Collections.emptyList(),
+					new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
 		} catch (IllegalArgumentException expectedException) {
 			// test passes
 			return;
@@ -153,39 +155,66 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	/**
 	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
 	 */
-	public void runTransportClientFailsTest() throws Exception {
+	public void runInvalidElasticsearchClusterTest() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", "invalid-cluster-name");
 
-		source.addSink(createElasticsearchSinkForEmbeddedNode(
-			Collections.unmodifiableMap(userConfig), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+		source.addSink(createElasticsearchSinkForNode(
+				1,
+				"invalid-cluster-name",
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
+				"123.123.123.123")); // incorrect ip address
 
 		try {
-			env.execute("Elasticsearch Transport Client Test");
+			env.execute("Elasticsearch Sink Test");
 		} catch (JobExecutionException expectedException) {
-			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			// test passes
 			return;
 		}
 
 		fail();
 	}
 
+	/**
+	 * Utility method to create a user config map.
+	 */
+	protected Map<String, String> createUserConfig(int bulkFlushMaxActions, String clusterName) {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", clusterName);
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(bulkFlushMaxActions));
+
+		return userConfig;
+	}
+
 	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */
-	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																			List<InetSocketAddress> transportAddresses,
-																			ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<A> addresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction);
 
 	/**
 	 * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
 	 *
-	 * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+	 * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List, ElasticsearchSinkFunction)}
 	 * because the Elasticsearch Java API to do so is incompatible across different versions.
 	 */
-	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception;
+
+	/**
+	 * Creates a version-specific Elasticsearch sink to connect to a specific Elasticsearch node.
+	 */
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception;
 }
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 28d5f34..4f1cd08 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge<Client> {
 
 	private static final long serialVersionUID = -2632363720584123682L;
 
@@ -70,7 +70,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public Client createClient(Map<String, String> clientConfig) {
 		if (transportAddresses == null) {
 
 			// Make sure that we disable http access to our embedded node
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 				.data(false)
 				.node();
 
-			AutoCloseable client = node.client();
+			Client client = node.client();
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Created Elasticsearch client from embedded node");
@@ -116,8 +116,8 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index e8eccd9..d5e1d1f 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.node.Node;
@@ -64,7 +65,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 5489290..2f1a65c 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -28,10 +28,12 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUti
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,26 +44,26 @@ import java.util.Map;
 /**
  * IT Cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception{
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	// -- Tests specific to Elasticsearch 1.x --
@@ -102,19 +104,28 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> transportAddresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
+				elasticsearchSinkFunction);
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		Map<String, String> userConfig = createUserConfig(bulkFlushMaxActions, clusterName);
 
 		// Elasticsearch 1.x requires this setting when using
 		// LocalTransportAddress to connect to a local embedded node
-		userConfig = new HashMap<>(userConfig);
 		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = new ArrayList<>();
@@ -126,6 +137,22 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 			elasticsearchSinkFunction);
 	}
 
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300));
+
+		return new ElasticsearchSink<>(
+			Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+			transports,
+			elasticsearchSinkFunction);
+	}
+
 	/**
 	 * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
 	 */
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 80c1b3a..73a69eb 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -44,7 +43,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
 
 	private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +62,7 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public TransportClient createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
 
 		TransportClient transportClient = TransportClient.builder().settings(settings).build();
@@ -84,8 +83,8 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index ffccacf..a911905 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -58,7 +58,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 7ded893..7887e72 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -17,57 +17,81 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception{
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> transportAddresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transportAddresses,
+				elasticsearchSinkFunction);
+	}
+
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
 
 		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+		transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transports,
+				elasticsearchSinkFunction);
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1e73feb..a3453ec 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
@@ -47,7 +46,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +65,7 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public TransportClient createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.builder().put(clientConfig)
 			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
 			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -90,8 +89,8 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 6c09337..b99b353 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -59,7 +59,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index ad7c664..67daa40 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -18,58 +18,85 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception {
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> addresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				addresses,
+				elasticsearchSinkFunction);
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
+	}
+
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
 
 		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+		transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transports,
+				elasticsearchSinkFunction);
 	}
-
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index e453837..ef06d80 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -81,7 +81,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-to-slf4j</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -141,14 +141,14 @@ under the License.
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-api</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-core</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 2cb4ea0..03bf9c0 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.util.Preconditions;
@@ -26,6 +27,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
@@ -33,13 +35,15 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
  */
-public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+@Internal
+public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -50,15 +54,31 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
 	 */
 	private final List<HttpHost> httpHosts;
 
-	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+	/**
+	 * The factory to configure the rest client.
+	 */
+	private final RestClientFactory restClientFactory;
+
+	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
 		Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
 		this.httpHosts = httpHosts;
+		this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
-		RestHighLevelClient rhlClient =
-			new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+	public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
+		RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+		restClientFactory.configureRestClientBuilder(builder);
+
+		RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+		}
+
+		if (!rhlClient.ping()) {
+			throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+		}
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
@@ -68,9 +88,8 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		RestHighLevelClient rhlClient = (RestHighLevelClient) client;
-		return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client::bulkAsync, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 3f75b5f..4e7a263 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -17,17 +17,19 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.RestHighLevelClient;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,10 +40,6 @@ import java.util.Map;
  * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
- * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
- * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
- * which should be set to the name of the cluster that the sink should emit to.
- *
  * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
@@ -58,34 +56,156 @@ import java.util.Map;
  *
  * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
-	 *
-	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
-	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
-	 */
-	public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+	private ElasticsearchSink(
+		Map<String, String> bulkRequestsConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler,
+		RestClientFactory restClientFactory) {
 
-		this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+		super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
 	}
 
 	/**
-	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 * A builder for creating an {@link ElasticsearchSink}.
 	 *
-	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
-	 * @param failureHandler This is used to handle failed {@link ActionRequest}
-	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 * @param <T> Type of the elements handled by the sink this builder creates.
 	 */
-	public ElasticsearchSink(
-		Map<String, String> userConfig,
-		List<HttpHost> httpHosts,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
-		ActionRequestFailureHandler failureHandler) {
+	@PublicEvolving
+	public static class Builder<T> {
+
+		private final List<HttpHost> httpHosts;
+		private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+		private Map<String, String> bulkRequestsConfig = new HashMap<>();
+		private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+		private RestClientFactory restClientFactory = restClientBuilder -> {};
+
+		/**
+		 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+		 *
+		 * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} connects to.
+		 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element.
+		 */
+		public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+			this.httpHosts = Preconditions.checkNotNull(httpHosts);
+			this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+		}
+
+		/**
+		 * Sets the maximum number of actions to buffer for each bulk request.
+		 *
+		 * @param numMaxActions the maxinum number of actions to buffer per bulk request.
+		 */
+		public void setBulkFlushMaxActions(int numMaxActions) {
+			Preconditions.checkArgument(
+				numMaxActions > 0,
+				"Max number of buffered actions must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+		}
+
+		/**
+		 * Sets the maximum size of buffered actions, in mb, per bulk request.
+		 *
+		 * @param maxSizeMb the maximum size of buffered actions, in mb.
+		 */
+		public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+			Preconditions.checkArgument(
+				maxSizeMb > 0,
+				"Max size of buffered actions must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+		}
+
+		/**
+		 * Sets the bulk flush interval, in milliseconds.
+		 *
+		 * @param intervalMillis the bulk flush interval, in milliseconds.
+		 */
+		public void setBulkFlushInterval(long intervalMillis) {
+			Preconditions.checkArgument(
+				intervalMillis >= 0,
+				"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+		}
+
+		/**
+		 * Sets whether or not to enable bulk flush backoff behaviour.
+		 *
+		 * @param enabled whether or not to enable backoffs.
+		 */
+		public void setBulkFlushBackoff(boolean enabled) {
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+		}
+
+		/**
+		 * Sets the type of back of to use when flushing bulk requests.
+		 *
+		 * @param flushBackoffType the backoff type to use.
+		 */
+		public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+			this.bulkRequestsConfig.put(
+				CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+				Preconditions.checkNotNull(flushBackoffType).toString());
+		}
+
+		/**
+		 * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+		 *
+		 * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
+		 */
+		public void setBulkFlushBackoffRetries(int maxRetries) {
+			Preconditions.checkArgument(
+				maxRetries > 0,
+				"Max number of backoff attempts must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+		}
+
+		/**
+		 * Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+		 *
+		 * @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+		 */
+		public void setBulkFlushBackoffDelay(long delayMillis) {
+			Preconditions.checkArgument(
+				delayMillis >= 0,
+				"Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+		}
+
+		/**
+		 * Sets a failure handler for action requests.
+		 *
+		 * @param failureHandler This is used to handle failed {@link ActionRequest}.
+		 */
+		public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+			this.failureHandler = Preconditions.checkNotNull(failureHandler);
+		}
+
+		/**
+		 * Sets a REST client factory for custom client configuration.
+		 *
+		 * @param restClientFactory the factory that configures the rest client.
+		 */
+		public void setRestClientFactory(RestClientFactory restClientFactory) {
+			this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+		}
 
-		super(new Elasticsearch6ApiCallBridge(httpHosts),  userConfig, elasticsearchSinkFunction, failureHandler);
+		/**
+		 * Creates the Elasticsearch sink.
+		 *
+		 * @return the created Elasticsearch sink.
+		 */
+		public ElasticsearchSink<T> build() {
+			return new ElasticsearchSink<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
new file mode 100644
index 0000000..4b74649
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Serializable;
+
+/**
+ * A factory that is used to configure the {@link org.elasticsearch.client.RestHighLevelClient} internally
+ * used in the {@link ElasticsearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+	/**
+	 * Configures the rest client builder.
+	 *
+	 * @param restClientBuilder the configured rest client builder.
+	 */
+	void configureRestClientBuilder(RestClientBuilder restClientBuilder);
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
index f419b41..8dc6216 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.InternalSettingsPreparer;
 import org.elasticsearch.node.Node;
@@ -44,11 +43,9 @@ public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElastic
 		if (node == null) {
 			Settings settings = Settings.builder()
 				.put("cluster.name", clusterName)
-				.put("http.enabled", false)
+				.put("http.enabled", true)
 				.put("path.home", tmpDataFolder.getParent())
 				.put("path.data", tmpDataFolder.getAbsolutePath())
-				.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
-				.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
 				.build();
 
 			node = new PluginNode(settings);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
index 2170771..a6f0125 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -19,134 +19,82 @@
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
-import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 
 import org.apache.http.HttpHost;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.Test;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
-
-	/**
-	 * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
-	 */
-	public void runTransportClientTest() throws Exception {
-		final String index = "transport-client-test-index";
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
-			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
-
-		env.execute("Elasticsearch RestHighLevelClient Test");
-
-		// verify the results
-		Client client = embeddedNodeEnv.getClient();
-		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> {
 
-		client.close();
+	@Test
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
-	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
-	 */
-	public void runNullTransportClientTest() throws Exception {
-		try {
-			Map<String, String> userConfig = new HashMap<>();
-			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-			createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
-			// test passes
-			return;
-		}
-
-		fail();
+	@Test
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
-	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
-	 */
-	public void runEmptyTransportClientTest() throws Exception {
-		try {
-			Map<String, String> userConfig = new HashMap<>();
-			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-			createElasticsearchSink6(userConfig,
-				Collections.<HttpHost>emptyList(),
-				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
-			// test passes
-			return;
-		}
-
-		fail();
+	@Test
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
-	/**
-	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
-	 */
-	public void runTransportClientFailsTest() throws Exception {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+	@Test
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
+	}
 
-		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
-			new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<HttpHost> httpHosts,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
 
-		try {
-			env.execute("Elasticsearch Transport Client Test");
-		} catch (JobExecutionException expectedException) {
-			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
-			return;
-		}
+		ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+		builder.setBulkFlushMaxActions(bulkFlushMaxActions);
 
-		fail();
+		return builder.build();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return null;
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
+
 		ArrayList<HttpHost> httpHosts = new ArrayList<>();
-		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
-	}
+		httpHosts.add(new HttpHost(ipAddress, 9200, "http"));
+
+		ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+		builder.setBulkFlushMaxActions(bulkFlushMaxActions);
 
-	private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
-		Map<String, String> userConfig,
-		List<HttpHost> httpHosts,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+		return builder.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index de1670f..0000000
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch6.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
-
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<HttpHost> httpHosts = new ArrayList<>();
-		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
index 2055184..fcd8654 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -22,6 +22,3 @@ log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index 5544ea5..dedcbb2 100644
--- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 
@@ -62,19 +61,17 @@ public class Elasticsearch6SinkExample {
 				}
 			});
 
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
 		List<HttpHost> httpHosts = new ArrayList<>();
 		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 
-		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element, parameterTool));
-			}
-		}));
+		ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+			httpHosts,
+			(String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+
+		// this instructs the sink to emit after every element, otherwise they would be buffered
+		esSinkBuilder.setBulkFlushMaxActions(1);
+
+		source.addSink(esSinkBuilder.build());
 
 		env.execute("Elasticsearch 6.x end to end sink test example");
 	}
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 7b627fe..fa6c331 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -42,15 +42,22 @@ function setup_elasticsearch {
 }
 
 function verify_elasticsearch_process_exist {
-    local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
-
-    # make sure the elasticsearch node is actually running
-    if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
-      echo "Elasticsearch node is not running."
-      exit 1
-    else
-      echo "Elasticsearch node is running."
-    fi
+    for ((i=1;i<=10;i++)); do
+        local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
+
+        echo "Waiting for Elasticsearch node to start ..."
+
+        # make sure the elasticsearch node is actually running
+        if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
+            sleep 1
+        else
+            echo "Elasticsearch node is running."
+            return
+        fi
+    done
+
+    echo "Elasticsearch node did not start properly"
+    exit 1
 }
 
 function verify_result {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 7464409..c8cd2db 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -32,9 +32,6 @@ start_cluster
 
 function test_cleanup {
   shutdown_elasticsearch_cluster index
-
-  # make sure to run regular cleanup as well
-   cleanup
 }
 
 trap test_cleanup INT
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index c4620c8..ae5e58c 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\
 flink-connectors/flink-connector-elasticsearch,\
 flink-connectors/flink-connector-elasticsearch2,\
 flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch6,\
 flink-connectors/flink-connector-elasticsearch-base,\
 flink-connectors/flink-connector-filesystem,\
 flink-connectors/flink-connector-kafka-0.8,\


[flink] 05/06: [FLINK-9885] [tests] Add Elasticsearch 6.x end-to-end test

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 157db4a71a26d1d8bd2c4de68c48913160ff6b3f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jul 23 17:51:31 2018 +0800

    [FLINK-9885] [tests] Add Elasticsearch 6.x end-to-end test
---
 .../flink-elasticsearch6-test/pom.xml              | 92 ++++++++++++++++++++++
 .../streaming/tests/Elasticsearch6SinkExample.java | 92 ++++++++++++++++++++++
 flink-end-to-end-tests/pom.xml                     |  1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |  1 +
 4 files changed, 186 insertions(+)

diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
new file mode 100644
index 0000000..d170235
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.7-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-elasticsearch6-test</artifactId>
+	<name>flink-elasticsearch6-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>Elasticsearch6SinkExample</finalName>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.Elasticsearch6SinkExample</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
new file mode 100644
index 0000000..5544ea5
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch6Sink.
+ */
+public class Elasticsearch6SinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 3) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --numRecords <numRecords> --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message #" + value;
+				}
+			});
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<HttpHost> httpHosts = new ArrayList<>();
+		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch 6.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index d48819d..162ca66 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -48,6 +48,7 @@ under the License.
 		<module>flink-elasticsearch1-test</module>
 		<module>flink-elasticsearch2-test</module>
 		<module>flink-elasticsearch5-test</module>
+		<module>flink-elasticsearch6-test</module>
 		<module>flink-quickstart-test</module>
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index d06e80c..b4f3789 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -101,6 +101,7 @@ run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz"
 
 run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"


[flink] 02/06: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 208057654f80aa303fc69938264d71d729b02cf8
Author: Thomas Weise <th...@apache.org>
AuthorDate: Wed Jul 25 21:44:29 2018 -0700

    [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer.
    
    This closes #6427.
---
 .../kinesis/internals/KinesisDataFetcher.java      |  46 ++++++--
 .../kinesis/internals/ShardConsumer.java           | 121 ++++++++++-----------
 .../streaming/connectors/kinesis/util/AWSUtil.java |   4 +-
 .../connectors/kinesis/util/TimeoutLatch.java      |   3 +
 .../testutils/TestableKinesisDataFetcher.java      |   2 +-
 5 files changed, 104 insertions(+), 72 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 65de24c..13de032 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -163,6 +163,9 @@ public class KinesisDataFetcher<T> {
 	/** Reference to the first error thrown by any of the {@link ShardConsumer} threads. */
 	private final AtomicReference<Throwable> error;
 
+	/** The Kinesis proxy factory that will be used to create instances for discovery and shard consumers. */
+	private final FlinkKinesisProxyFactory kinesisProxyFactory;
+
 	/** The Kinesis proxy that the fetcher will be using to discover new shards. */
 	private final KinesisProxyInterface kinesis;
 
@@ -180,6 +183,13 @@ public class KinesisDataFetcher<T> {
 	private volatile boolean running = true;
 
 	/**
+	 * Factory to create Kinesis proxy instances used by a fetcher.
+	 */
+	public interface FlinkKinesisProxyFactory {
+		KinesisProxyInterface create(Properties configProps);
+	}
+
+	/**
 	 * Creates a Kinesis Data Fetcher.
 	 *
 	 * @param streams the streams to subscribe to
@@ -204,7 +214,7 @@ public class KinesisDataFetcher<T> {
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-			KinesisProxy.create(configProps));
+			KinesisProxy::create);
 	}
 
 	@VisibleForTesting
@@ -218,7 +228,7 @@ public class KinesisDataFetcher<T> {
 								AtomicReference<Throwable> error,
 								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
-								KinesisProxyInterface kinesis) {
+								FlinkKinesisProxyFactory kinesisProxyFactory) {
 		this.streams = checkNotNull(streams);
 		this.configProps = checkNotNull(configProps);
 		this.sourceContext = checkNotNull(sourceContext);
@@ -228,7 +238,8 @@ public class KinesisDataFetcher<T> {
 		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
 		this.deserializationSchema = checkNotNull(deserializationSchema);
 		this.shardAssigner = checkNotNull(shardAssigner);
-		this.kinesis = checkNotNull(kinesis);
+		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
+		this.kinesis = kinesisProxyFactory.create(configProps);
 
 		this.consumerMetricGroup = runtimeContext.getMetricGroup()
 			.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
@@ -242,6 +253,29 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @return shard consumer
+	 */
+	protected ShardConsumer createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		ShardMetricsReporter shardMetricsReporter) {
+		return new ShardConsumer<>(
+			this,
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			this.kinesisProxyFactory.create(configProps),
+			shardMetricsReporter);
+	}
+
+	/**
 	 * Starts the fetcher. After starting the fetcher, it can only
 	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
 	 *
@@ -297,8 +331,7 @@ public class KinesisDataFetcher<T> {
 					}
 
 				shardConsumersExecutor.submit(
-					new ShardConsumer<>(
-						this,
+					createShardConsumer(
 						seededStateIndex,
 						subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
 						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
@@ -344,8 +377,7 @@ public class KinesisDataFetcher<T> {
 				}
 
 				shardConsumersExecutor.submit(
-					new ShardConsumer<>(
-						this,
+					createShardConsumer(
 						newStateIndex,
 						newShardState.getStreamShardHandle(),
 						newShardState.getLastProcessedSequenceNum(),
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index b14c6a4..d698ecf 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -24,7 +24,6 @@ import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporte
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
@@ -87,28 +86,15 @@ public class ShardConsumer<T> implements Runnable {
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param kinesis the proxy instance to interact with Kinesis
 	 * @param shardMetricsReporter the reporter to report metrics to
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 						Integer subscribedShardStateIndex,
 						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum,
+						KinesisProxyInterface kinesis,
 						ShardMetricsReporter shardMetricsReporter) {
-		this(fetcherRef,
-			subscribedShardStateIndex,
-			subscribedShard,
-			lastSequenceNum,
-			KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
-			shardMetricsReporter);
-	}
-
-	/** This constructor is exposed for testing purposes. */
-	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-							Integer subscribedShardStateIndex,
-							StreamShardHandle subscribedShard,
-							SequenceNumber lastSequenceNum,
-							KinesisProxyInterface kinesis,
-							ShardMetricsReporter shardMetricsReporter) {
 		this.fetcherRef = checkNotNull(fetcherRef);
 		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
 		this.subscribedShard = checkNotNull(subscribedShard);
@@ -152,62 +138,73 @@ public class ShardConsumer<T> implements Runnable {
 		}
 	}
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public void run() {
+	/**
+	 * Find the initial shard iterator to start getting records from.
+	 * @return shard iterator
+	 * @throws Exception
+	 */
+	protected String getInitialShardIterator() throws Exception {
 		String nextShardItr;
 
-		try {
-			// before infinitely looping, we set the initial nextShardItr appropriately
+		// before infinitely looping, we set the initial nextShardItr appropriately
 
-			if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
-				// if the shard is already closed, there will be no latest next record to get for this shard
-				if (subscribedShard.isClosed()) {
-					nextShardItr = null;
-				} else {
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
-				}
-			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
-			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+			// if the shard is already closed, there will be no latest next record to get for this shard
+			if (subscribedShard.isClosed()) {
 				nextShardItr = null;
-			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
 			} else {
-				// we will be starting from an actual sequence number (due to restore from failure).
-				// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
-				// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
-				if (lastSequenceNum.isAggregated()) {
-					String itrForLastAggregatedRecord =
-						kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
-					// get only the last aggregated record
-					GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-					long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
-					for (UserRecord record : fetchedRecords) {
-						// we have found a dangling sub-record if it has a larger subsequence number
-						// than our last sequence number; if so, collect the record and update state
-						if (record.getSubSequenceNumber() > lastSubSequenceNum) {
-							deserializeRecordForCollectionAndUpdateState(record);
-						}
+				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
+			}
+		} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
+			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
+		} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+			nextShardItr = null;
+		} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
+		} else {
+			// we will be starting from an actual sequence number (due to restore from failure).
+			// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
+			// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
+
+			if (lastSequenceNum.isAggregated()) {
+				String itrForLastAggregatedRecord =
+					kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+				// get only the last aggregated record
+				GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
+
+				List<UserRecord> fetchedRecords = deaggregateRecords(
+					getRecordsResult.getRecords(),
+					subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+					subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+				long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
+				for (UserRecord record : fetchedRecords) {
+					// we have found a dangling sub-record if it has a larger subsequence number
+					// than our last sequence number; if so, collect the record and update state
+					if (record.getSubSequenceNumber() > lastSubSequenceNum) {
+						deserializeRecordForCollectionAndUpdateState(record);
 					}
-
-					// set the nextShardItr so we can continue iterating in the next while loop
-					nextShardItr = getRecordsResult.getNextShardIterator();
-				} else {
-					// the last record was non-aggregated, so we can simply start from the next record
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
 				}
+
+				// set the nextShardItr so we can continue iterating in the next while loop
+				nextShardItr = getRecordsResult.getNextShardIterator();
+			} else {
+				// the last record was non-aggregated, so we can simply start from the next record
+				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
 			}
+		}
+		return nextShardItr;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void run() {
+		try {
+			String nextShardItr = getInitialShardIterator();
 
 			long processingStartTimeNanos = System.nanoTime();
+
 			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 9e5c6cb..e25a601 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -36,14 +36,14 @@ import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
 import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.DeserializerFactory;
-import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
-import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 
 import java.io.IOException;
 import java.util.HashMap;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
index 4dcab33..49a9ed7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
 
+/**
+ * Internal use.
+ */
 @Internal
 public class TimeoutLatch {
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b7cfb2d..21588c9 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -72,7 +72,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-			fakeKinesis);
+			(properties) -> fakeKinesis);
 
 		this.runWaiter = new OneShotLatch();
 		this.initialDiscoveryWaiter = new OneShotLatch();


[flink] 03/06: [FLINK-7386] [elasticsearch] Evolve ES connector API to make it working with Elasticsearch 5.3+

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cea6c9a04b28536f6f9a68c6ae81c36177756730
Author: Christophe Jolif <cj...@gmail.com>
AuthorDate: Fri May 18 00:17:04 2018 +0200

    [FLINK-7386] [elasticsearch] Evolve ES connector API to make it working with Elasticsearch 5.3+
    
    This closes #6043.
---
 .../elasticsearch/BulkProcessorIndexer.java        | 29 +++++++++++++--
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 18 ++++++----
 .../elasticsearch/ElasticsearchSinkBase.java       |  4 +--
 .../connectors/elasticsearch/RequestIndexer.java   | 42 ++++++++++++++++++++--
 .../elasticsearch/ElasticsearchSinkBaseTest.java   | 37 +++++++++----------
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |  9 +++--
 .../Elasticsearch2ApiCallBridge.java               | 15 ++++----
 .../Elasticsearch5ApiCallBridge.java               | 15 ++++----
 8 files changed, 119 insertions(+), 50 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 2ebb97c..33b42cb 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,12 +48,32 @@ class BulkProcessorIndexer implements RequestIndexer {
 	}
 
 	@Override
-	public void add(ActionRequest... actionRequests) {
-		for (ActionRequest actionRequest : actionRequests) {
+	public void add(DeleteRequest... deleteRequests) {
+		for (DeleteRequest deleteRequest : deleteRequests) {
 			if (flushOnCheckpoint) {
 				numPendingRequestsRef.getAndIncrement();
 			}
-			this.bulkProcessor.add(actionRequest);
+			this.bulkProcessor.add(deleteRequest);
+		}
+	}
+
+	@Override
+	public void add(IndexRequest... indexRequests) {
+		for (IndexRequest indexRequest : indexRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(indexRequest);
+		}
+	}
+
+	@Override
+	public void add(UpdateRequest... updateRequests) {
+		for (UpdateRequest updateRequest : updateRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(updateRequest);
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 2a7a216..1c501bf 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
 
@@ -39,15 +38,18 @@ import java.util.Map;
  * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
 	/**
-	 * Creates an Elasticsearch {@link Client}.
+	 * Creates an Elasticsearch client implementing {@link AutoCloseable}. This can
+	 * be a {@link org.elasticsearch.client.Client} or {@link org.elasticsearch.client.RestHighLevelClient}
 	 *
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
 	 */
-	Client createClient(Map<String, String> clientConfig);
+	public abstract AutoCloseable createClient(Map<String, String> clientConfig);
+
+	public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
 
 	/**
 	 * Extracts the cause of failure of a bulk item action.
@@ -55,7 +57,7 @@ public interface ElasticsearchApiCallBridge extends Serializable {
 	 * @param bulkItemResponse the bulk item response to extract cause of failure
 	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
 	 */
-	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+	public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
 	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
@@ -64,13 +66,15 @@ public interface ElasticsearchApiCallBridge extends Serializable {
 	 * @param builder the {@link BulkProcessor.Builder} to configure.
 	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
 	 */
-	void configureBulkProcessorBackoff(
+	public abstract void configureBulkProcessorBackoff(
 		BulkProcessor.Builder builder,
 		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
 
 	/**
 	 * Perform any necessary state cleanup.
 	 */
-	void cleanup();
+	public void cleanup() {
+		// nothing to cleanup by default
+	}
 
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9105d99..0305ee3d 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -176,7 +176,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	private AtomicLong numPendingRequests = new AtomicLong(0);
 
 	/** Elasticsearch client created using the call bridge. */
-	private transient Client client;
+	private transient AutoCloseable client;
 
 	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
 	private transient BulkProcessor bulkProcessor;
@@ -341,7 +341,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 		checkNotNull(listener);
 
-		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener);
+		BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener);
 
 		// This makes flush() blocking
 		bulkProcessorBuilder.setConcurrentRequests(0);
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 2a1b297..3dc8f87 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -21,9 +21,12 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
@@ -33,6 +36,41 @@ public interface RequestIndexer {
 	 * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
 	 *
 	 * @param actionRequests The multiple {@link ActionRequest} to add.
+	 * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest}
 	 */
-	void add(ActionRequest... actionRequests);
+	@Deprecated
+	default void add(ActionRequest... actionRequests) {
+		for (ActionRequest actionRequest : actionRequests) {
+			if (actionRequest instanceof IndexRequest) {
+				add((IndexRequest) actionRequest);
+			} else if (actionRequest instanceof DeleteRequest) {
+				add((DeleteRequest) actionRequest);
+			} else if (actionRequest instanceof UpdateRequest) {
+				add((UpdateRequest) actionRequest);
+			} else {
+				throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
+			}
+		}
+	}
+
+	/**
+	 * Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param deleteRequests The multiple {@link DeleteRequest} to add.
+	 */
+	void add(DeleteRequest... deleteRequests);
+
+	/**
+	 * Add multiple {@link IndexRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param indexRequests The multiple {@link IndexRequest} to add.
+	 */
+	void add(IndexRequest... indexRequests);
+
+	/**
+	 * Add multiple {@link UpdateRequest} to the indexer to prepare for sending requests to Elasticsearch.
+	 *
+	 * @param updateRequests The multiple {@link UpdateRequest} to add.
+	 */
+	void add(UpdateRequest... updateRequests);
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 09d8806..5a161a7 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.junit.Assert;
@@ -92,7 +93,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and its mock item failures
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -124,7 +125,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and its mock item failures
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -164,7 +165,7 @@ public class ElasticsearchSinkBaseTest {
 		sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
 
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request (1 request only, thus should succeed)
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -172,7 +173,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the requests to be flushed in the snapshot
 		testHarness.processElement(new StreamRecord<>("msg-2"));
 		testHarness.processElement(new StreamRecord<>("msg-3"));
-		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -217,7 +218,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and let the whole bulk request fail
 		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -249,7 +250,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and let the whole bulk request fail
 		sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -284,7 +285,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and let bulk request succeed
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// manually execute the next bulk request
 		sink.manualBulkRequestWithAllPendingRequests();
@@ -292,7 +293,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the requests to be flushed in the snapshot
 		testHarness.processElement(new StreamRecord<>("msg-2"));
 		testHarness.processElement(new StreamRecord<>("msg-3"));
-		verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -346,7 +347,7 @@ public class ElasticsearchSinkBaseTest {
 		// it contains 1 request, which will fail and re-added to the next bulk request
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		CheckedThread snapshotThread = new CheckedThread() {
 			@Override
@@ -402,7 +403,7 @@ public class ElasticsearchSinkBaseTest {
 		// setup the next bulk request, and let bulk request succeed
 		sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
 		testHarness.processElement(new StreamRecord<>("msg-1"));
-		verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class));
+		verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class));
 
 		// the snapshot should not block even though we haven't flushed the bulk request
 		testHarness.snapshot(1L, 1000L);
@@ -478,11 +479,11 @@ public class ElasticsearchSinkBaseTest {
 		protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
 			this.mockBulkProcessor = mock(BulkProcessor.class);
 
-			when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() {
+			when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new Answer<Object>() {
 				@Override
 				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 					// intercept the request and add it to our mock bulk request
-					nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+					nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class));
 
 					return null;
 				}
@@ -530,12 +531,12 @@ public class ElasticsearchSinkBaseTest {
 		}
 	}
 
-	private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge {
+	private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge {
 
 		private static final long serialVersionUID = -4272760730959041699L;
 
 		@Override
-		public Client createClient(Map<String, String> clientConfig) {
+		public AutoCloseable createClient(Map<String, String> clientConfig) {
 			return mock(Client.class);
 		}
 
@@ -550,13 +551,13 @@ public class ElasticsearchSinkBaseTest {
 		}
 
 		@Override
-		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
-			// no need for this in the test cases here
+		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+			return null;
 		}
 
 		@Override
-		public void cleanup() {
-			// nothing to cleanup
+		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+			// no need for this in the test cases here
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 2a3c2a0..6f49206 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = -2632363720584123682L;
 
@@ -70,7 +70,7 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		if (transportAddresses == null) {
 
 			// Make sure that we disable http access to our embedded node
@@ -116,6 +116,11 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
+	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
 			return null;
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 390a407..80c1b3a 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -44,7 +44,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +63,7 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
 
 		TransportClient transportClient = TransportClient.builder().settings(settings).build();
@@ -84,6 +84,11 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
+	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
 			return null;
@@ -117,10 +122,4 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
 
 		builder.setBackoffPolicy(backoffPolicy);
 	}
-
-	@Override
-	public void cleanup() {
-		// nothing to cleanup
-	}
-
 }
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 7c4ba7a..1e73feb 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -47,7 +47,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
+public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +66,7 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public Client createClient(Map<String, String> clientConfig) {
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.builder().put(clientConfig)
 			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
 			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -90,6 +90,11 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 	}
 
 	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder((Client) client, listener);
+	}
+
+	@Override
 	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
 		if (!bulkItemResponse.isFailed()) {
 			return null;
@@ -123,10 +128,4 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge {
 
 		builder.setBackoffPolicy(backoffPolicy);
 	}
-
-	@Override
-	public void cleanup() {
-		// nothing to cleanup
-	}
-
 }