You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/01 13:17:49 UTC

[flink] 06/06: [FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abbd6b02d743486f3c0c1336139dd6b3edd20840
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 17:21:34 2018 +0800

    [FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector
    
    This closes #6391.
---
 docs/dev/connectors/elasticsearch.md               | 192 ++++++++++++++++++---
 .../elasticsearch/ElasticsearchApiCallBridge.java  |  22 ++-
 .../elasticsearch/ElasticsearchSinkBase.java       |  24 ++-
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |   8 +-
 .../elasticsearch/ElasticsearchSinkTestBase.java   | 103 +++++++----
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |  10 +-
 .../elasticsearch/ElasticsearchSink.java           |   3 +-
 .../elasticsearch/ElasticsearchSinkITCase.java     |  59 +++++--
 .../Elasticsearch2ApiCallBridge.java               |   9 +-
 .../elasticsearch2/ElasticsearchSink.java          |   2 +-
 .../elasticsearch2/ElasticsearchSinkITCase.java    |  60 +++++--
 .../Elasticsearch5ApiCallBridge.java               |   9 +-
 .../elasticsearch5/ElasticsearchSink.java          |   2 +-
 .../elasticsearch5/ElasticsearchSinkITCase.java    |  65 +++++--
 .../flink-connector-elasticsearch6/pom.xml         |   6 +-
 .../Elasticsearch6ApiCallBridge.java               |  35 +++-
 .../elasticsearch6/ElasticsearchSink.java          | 168 +++++++++++++++---
 .../elasticsearch6/RestClientFactory.java          |  40 +++++
 .../EmbeddedElasticsearchNodeEnvironmentImpl.java  |   5 +-
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 144 +++++-----------
 .../examples/ElasticsearchSinkExample.java         |  81 ---------
 .../src/test/resources/log4j-test.properties       |   3 -
 .../streaming/tests/Elasticsearch6SinkExample.java |  19 +-
 .../test-scripts/elasticsearch-common.sh           |  25 ++-
 .../test-scripts/test_streaming_elasticsearch.sh   |   3 -
 tools/travis_mvn_watchdog.sh                       |   1 +
 26 files changed, 703 insertions(+), 395 deletions(-)

diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 20a7d71..bafe391 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -84,6 +84,23 @@ The example below shows how to configure and create a sink:
 <div class="codetabs" markdown="1">
 <div data-lang="java, Elasticsearch 1.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -115,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
 </div>
 <div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 Map<String, String> config = new HashMap<>();
@@ -145,31 +178,83 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
 </div>
 <div data-lang="java, Elasticsearch 6.x" markdown="1">
 {% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 DataStream<String> input = ...;
 
 List<HttpHost> httpHost = new ArrayList<>();
 httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
 
-input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
-    public IndexRequest createIndexRequest(String element) {
-        Map<String, String> json = new HashMap<>();
-        json.put("data", element);
-    
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-    
-    @Override
-    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-        indexer.add(createIndexRequest(element));
+// use a ElasticsearchSink.Builder to create an ElasticsearchSink
+ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+    httpHosts,
+    new ElasticsearchSinkFunction<String>() {
+        public IndexRequest createIndexRequest(String element) {
+            Map<String, String> json = new HashMap<>();
+            json.put("data", element);
+        
+            return Requests.indexRequest()
+                    .index("my-index")
+                    .type("my-type")
+                    .source(json);
+        }
+        
+        @Override
+        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+            indexer.add(createIndexRequest(element));
+        }
     }
-}));{% endhighlight %}
+);
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1);
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+);
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build());
+{% endhighlight %}
 </div>
 <div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.common.transport.TransportAddress
+
+import java.net.InetAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -196,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 </div>
 <div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
 val input: DataStream[String] = ...
 
 val config = new java.util.HashMap[String, String]
@@ -222,33 +323,72 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 </div>
 <div data-lang="scala, Elasticsearch 6.x" markdown="1">
 {% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
+
+import org.apache.http.HttpHost
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.util.ArrayList
+import java.util.List
+
 val input: DataStream[String] = ...
 
 val httpHosts = new java.util.ArrayList[HttpHost]
 httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
 httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
 
-input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new java.util.HashMap[String, String]
-    json.put("data", element)
-    
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json)
+val esSinkBuilder = new ElasticsearchSink.Builer[String](
+  httpHosts,
+  new ElasticsearchSinkFunction[String] {
+    def createIndexRequest(element: String): IndexRequest = {
+      val json = new java.util.HashMap[String, String]
+      json.put("data", element)
+      
+      return Requests.indexRequest()
+              .index("my-index")
+              .type("my-type")
+              .source(json)
+    }
   }
-}))
+)
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1)
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+  restClientBuilder -> {
+    restClientBuilder.setDefaultHeaders(...)
+    restClientBuilder.setMaxRetryTimeoutMillis(...)
+    restClientBuilder.setPathPrefix(...)
+    restClientBuilder.setHttpClientConfigCallback(...)
+  }
+)
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build)
 {% endhighlight %}
 </div>
 </div>
 
-Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate
+with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s
+is used to configure the `ElasticsearchSink`. This config map will be directly
+forwarded when creating the internally used `TransportClient`.
 The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
 the name of your cluster.
 
+For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication.
+By default, the connector uses the default configurations for the REST client. To have custom
+configuration for the REST client, users can provide a `RestClientFactory` implementation when 
+setting up the `ElasticsearchClient.Builder` that builds the sink.
+
 Also note that the example only demonstrates performing a single index
 request for each incoming element. Generally, the `ElasticsearchSinkFunction`
 can be used to perform multiple requests of different types (ex.,
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 90d84f3..f1dcc83 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkProcessor;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
@@ -36,9 +37,11 @@ import java.util.Map;
  * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
  * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
  * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
  */
 @Internal
-public abstract class ElasticsearchApiCallBridge implements Serializable {
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
 
 	/**
 	 * Creates an Elasticsearch client implementing {@link AutoCloseable}.
@@ -46,9 +49,16 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
 	 */
-	public abstract AutoCloseable createClient(Map<String, String> clientConfig);
+	C createClient(Map<String, String> clientConfig) throws IOException;
 
-	public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
+	/**
+	 * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+	 *
+	 * @param client the Elasticsearch client.
+	 * @param listener the bulk processor listender.
+	 * @return the bulk processor builder.
+	 */
+	BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
 
 	/**
 	 * Extracts the cause of failure of a bulk item action.
@@ -56,7 +66,7 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param bulkItemResponse the bulk item response to extract cause of failure
 	 * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
 	 */
-	public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+	@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
 	/**
 	 * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
@@ -65,14 +75,14 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
 	 * @param builder the {@link BulkProcessor.Builder} to configure.
 	 * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
 	 */
-	public abstract void configureBulkProcessorBackoff(
+	void configureBulkProcessorBackoff(
 		BulkProcessor.Builder builder,
 		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
 
 	/**
 	 * Perform any necessary state cleanup.
 	 */
-	public void cleanup() {
+	default void cleanup() {
 		// nothing to cleanup by default
 	}
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9830484..7dac06c 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +33,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -58,12 +60,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
  * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
  *
  * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable}
  */
 @Internal
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = -1007596293618451942L;
 
@@ -84,6 +87,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	/**
 	 * Used to control whether the retry delay should increase exponentially or remain constant.
 	 */
+	@PublicEvolving
 	public enum FlushBackoffType {
 		CONSTANT,
 		EXPONENTIAL
@@ -134,14 +138,20 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 
 	private final Integer bulkProcessorFlushMaxActions;
 	private final Integer bulkProcessorFlushMaxSizeMb;
-	private final Integer bulkProcessorFlushIntervalMillis;
+	private final Long bulkProcessorFlushIntervalMillis;
 	private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
 
 	// ------------------------------------------------------------------------
 	//  User-facing API and configuration
 	// ------------------------------------------------------------------------
 
-	/** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */
+	/**
+	 * The config map that contains configuration for the bulk flushing behaviours.
+	 *
+	 * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config
+	 * map would also contain Elasticsearch-shipped configuration, and therefore this config map
+	 * would also be forwarded when creating the Elasticsearch client.
+	 */
 	private final Map<String, String> userConfig;
 
 	/** The function that is used to construct multiple {@link ActionRequest ActionRequests} from each incoming element. */
@@ -161,7 +171,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	// ------------------------------------------------------------------------
 
 	/** Call bridge for different version-specific. */
-	private final ElasticsearchApiCallBridge callBridge;
+	private final ElasticsearchApiCallBridge<C> callBridge;
 
 	/**
 	 * Number of pending action requests not yet acknowledged by Elasticsearch.
@@ -175,7 +185,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	private AtomicLong numPendingRequests = new AtomicLong(0);
 
 	/** Elasticsearch client created using the call bridge. */
-	private transient AutoCloseable client;
+	private transient C client;
 
 	/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
 	private transient BulkProcessor bulkProcessor;
@@ -236,7 +246,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 		}
 
 		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
-			bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+			bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
 			userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
 		} else {
 			bulkProcessorFlushIntervalMillis = null;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 460e939..369d26a 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -411,7 +411,7 @@ public class ElasticsearchSinkBaseTest {
 		testHarness.close();
 	}
 
-	private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+	private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
 
 		private static final long serialVersionUID = 5051907841570096991L;
 
@@ -531,17 +531,17 @@ public class ElasticsearchSinkBaseTest {
 		}
 	}
 
-	private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge {
+	private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge<Client> {
 
 		private static final long serialVersionUID = -4272760730959041699L;
 
 		@Override
-		public AutoCloseable createClient(Map<String, String> clientConfig) {
+		public Client createClient(Map<String, String> clientConfig) {
 			return mock(Client.class);
 		}
 
 		@Override
-		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
 			return null;
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index df3779b..819ffba 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -34,19 +33,20 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
+ *
+ * @param <C> Elasticsearch client type
+ * @param <A> The address type to use
  */
-public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
 
@@ -85,24 +85,21 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
+	 * Tests that the Elasticsearch sink works properly.
 	 */
-	public void runTransportClientTest() throws Exception {
-		final String index = "transport-client-test-index";
+	public void runElasticsearchSinkTest() throws Exception {
+		final String index = "elasticsearch-sink-test-index";
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", CLUSTER_NAME);
-
 		source.addSink(createElasticsearchSinkForEmbeddedNode(
-			userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+				1,
+				CLUSTER_NAME,
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
 
-		env.execute("Elasticsearch TransportClient Test");
+		env.execute("Elasticsearch Sink Test");
 
 		// verify the results
 		Client client = embeddedNodeEnv.getClient();
@@ -112,16 +109,20 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is {@code null}.
 	 */
-	public void runNullTransportClientTest() throws Exception {
+	public void runNullAddressesTest() throws Exception {
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", CLUSTER_NAME);
 
 		try {
-			createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
+			createElasticsearchSink(
+					1,
+					CLUSTER_NAME,
+					null,
+					new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException | NullPointerException expectedException) {
 			// test passes
 			return;
 		}
@@ -130,18 +131,19 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	}
 
 	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is empty.
 	 */
-	public void runEmptyTransportClientTest() throws Exception {
+	public void runEmptyAddressesTest() throws Exception {
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", CLUSTER_NAME);
 
 		try {
 			createElasticsearchSink(
-				userConfig,
-				Collections.<InetSocketAddress>emptyList(),
-				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+					1,
+					CLUSTER_NAME,
+					Collections.emptyList(),
+					new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
 		} catch (IllegalArgumentException expectedException) {
 			// test passes
 			return;
@@ -153,39 +155,66 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
 	/**
 	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
 	 */
-	public void runTransportClientFailsTest() throws Exception {
+	public void runInvalidElasticsearchClusterTest() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
 
 		Map<String, String> userConfig = new HashMap<>();
 		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-		userConfig.put("cluster.name", "my-transport-client-cluster");
+		userConfig.put("cluster.name", "invalid-cluster-name");
 
-		source.addSink(createElasticsearchSinkForEmbeddedNode(
-			Collections.unmodifiableMap(userConfig), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+		source.addSink(createElasticsearchSinkForNode(
+				1,
+				"invalid-cluster-name",
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
+				"123.123.123.123")); // incorrect ip address
 
 		try {
-			env.execute("Elasticsearch Transport Client Test");
+			env.execute("Elasticsearch Sink Test");
 		} catch (JobExecutionException expectedException) {
-			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			// test passes
 			return;
 		}
 
 		fail();
 	}
 
+	/**
+	 * Utility method to create a user config map.
+	 */
+	protected Map<String, String> createUserConfig(int bulkFlushMaxActions, String clusterName) {
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", clusterName);
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(bulkFlushMaxActions));
+
+		return userConfig;
+	}
+
 	/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */
-	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																			List<InetSocketAddress> transportAddresses,
-																			ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<A> addresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction);
 
 	/**
 	 * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
 	 *
-	 * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+	 * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List, ElasticsearchSinkFunction)}
 	 * because the Elasticsearch Java API to do so is incompatible across different versions.
 	 */
-	protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception;
+
+	/**
+	 * Creates a version-specific Elasticsearch sink to connect to a specific Elasticsearch node.
+	 */
+	protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception;
 }
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 28d5f34..4f1cd08 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge<Client> {
 
 	private static final long serialVersionUID = -2632363720584123682L;
 
@@ -70,7 +70,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public Client createClient(Map<String, String> clientConfig) {
 		if (transportAddresses == null) {
 
 			// Make sure that we disable http access to our embedded node
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 				.data(false)
 				.node();
 
-			AutoCloseable client = node.client();
+			Client client = node.client();
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Created Elasticsearch client from embedded node");
@@ -116,8 +116,8 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index e8eccd9..d5e1d1f 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.node.Node;
@@ -64,7 +65,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 5489290..2f1a65c 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -28,10 +28,12 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUti
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,26 +44,26 @@ import java.util.Map;
 /**
  * IT Cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception{
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	// -- Tests specific to Elasticsearch 1.x --
@@ -102,19 +104,28 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> transportAddresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
+				elasticsearchSinkFunction);
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		Map<String, String> userConfig = createUserConfig(bulkFlushMaxActions, clusterName);
 
 		// Elasticsearch 1.x requires this setting when using
 		// LocalTransportAddress to connect to a local embedded node
-		userConfig = new HashMap<>(userConfig);
 		userConfig.put("node.local", "true");
 
 		List<TransportAddress> transports = new ArrayList<>();
@@ -126,6 +137,22 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 			elasticsearchSinkFunction);
 	}
 
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
+
+		List<TransportAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300));
+
+		return new ElasticsearchSink<>(
+			Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+			transports,
+			elasticsearchSinkFunction);
+	}
+
 	/**
 	 * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
 	 */
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 80c1b3a..73a69eb 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -44,7 +43,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
 
 	private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +62,7 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public TransportClient createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.settingsBuilder().put(clientConfig).build();
 
 		TransportClient transportClient = TransportClient.builder().settings(settings).build();
@@ -84,8 +83,8 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index ffccacf..a911905 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -58,7 +58,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 7ded893..7887e72 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -17,57 +17,81 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception{
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> transportAddresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transportAddresses,
+				elasticsearchSinkFunction);
+	}
+
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
 
 		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+		transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transports,
+				elasticsearchSinkFunction);
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1e73feb..a3453ec 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
@@ -47,7 +46,7 @@ import java.util.Map;
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +65,7 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
+	public TransportClient createClient(Map<String, String> clientConfig) {
 		Settings settings = Settings.builder().put(clientConfig)
 			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
 			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -90,8 +89,8 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		return BulkProcessor.builder((Client) client, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 6c09337..b99b353 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -59,7 +59,7 @@ import java.util.Map;
  * @param <T> Type of the elements handled by this sink
  */
 @PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index ad7c664..67daa40 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -18,58 +18,85 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch5;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
 
 	@Test
-	public void testTransportClient() throws Exception {
-		runTransportClientTest();
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
 	@Test
-	public void testNullTransportClient() throws Exception {
-		runNullTransportClientTest();
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
 	@Test
-	public void testEmptyTransportClient() throws Exception {
-		runEmptyTransportClientTest();
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
 	@Test
-	public void testTransportClientFails() throws Exception {
-		runTransportClientFailsTest();
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
-																List<InetSocketAddress> transportAddresses,
-																ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<InetSocketAddress> addresses,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				addresses,
+				elasticsearchSinkFunction);
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
-		Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
+	}
+
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
 
 		List<InetSocketAddress> transports = new ArrayList<>();
-		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+		transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
 
-		return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+		return new ElasticsearchSink<>(
+				Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+				transports,
+				elasticsearchSinkFunction);
 	}
-
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index e453837..ef06d80 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -81,7 +81,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-to-slf4j</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -141,14 +141,14 @@ under the License.
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-api</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.logging.log4j</groupId>
 			<artifactId>log4j-core</artifactId>
-			<version>2.7</version>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 2cb4ea0..03bf9c0 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.util.Preconditions;
@@ -26,6 +27,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.slf4j.Logger;
@@ -33,13 +35,15 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
  */
-public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+@Internal
+public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> {
 
 	private static final long serialVersionUID = -5222683870097809633L;
 
@@ -50,15 +54,31 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
 	 */
 	private final List<HttpHost> httpHosts;
 
-	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+	/**
+	 * The factory to configure the rest client.
+	 */
+	private final RestClientFactory restClientFactory;
+
+	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
 		Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
 		this.httpHosts = httpHosts;
+		this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
 	}
 
 	@Override
-	public AutoCloseable createClient(Map<String, String> clientConfig) {
-		RestHighLevelClient rhlClient =
-			new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+	public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
+		RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+		restClientFactory.configureRestClientBuilder(builder);
+
+		RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+		}
+
+		if (!rhlClient.ping()) {
+			throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+		}
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
@@ -68,9 +88,8 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
 	}
 
 	@Override
-	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-		RestHighLevelClient rhlClient = (RestHighLevelClient) client;
-		return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+	public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
+		return BulkProcessor.builder(client::bulkAsync, listener);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 3f75b5f..4e7a263 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -17,17 +17,19 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.RestHighLevelClient;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,10 +40,6 @@ import java.util.Map;
  * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
  * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
  *
- * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
- * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
- * which should be set to the name of the cluster that the sink should emit to.
- *
  * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
  * This will buffer elements before sending a request to the cluster. The behaviour of the
  * {@code BulkProcessor} can be configured using these config keys:
@@ -58,34 +56,156 @@ import java.util.Map;
  *
  * @param <T> Type of the elements handled by this sink
  */
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
-	 *
-	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
-	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
-	 */
-	public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+	private ElasticsearchSink(
+		Map<String, String> bulkRequestsConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler,
+		RestClientFactory restClientFactory) {
 
-		this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+		super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
 	}
 
 	/**
-	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 * A builder for creating an {@link ElasticsearchSink}.
 	 *
-	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
-	 * @param failureHandler This is used to handle failed {@link ActionRequest}
-	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 * @param <T> Type of the elements handled by the sink this builder creates.
 	 */
-	public ElasticsearchSink(
-		Map<String, String> userConfig,
-		List<HttpHost> httpHosts,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
-		ActionRequestFailureHandler failureHandler) {
+	@PublicEvolving
+	public static class Builder<T> {
+
+		private final List<HttpHost> httpHosts;
+		private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+		private Map<String, String> bulkRequestsConfig = new HashMap<>();
+		private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+		private RestClientFactory restClientFactory = restClientBuilder -> {};
+
+		/**
+		 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+		 *
+		 * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} connects to.
+		 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element.
+		 */
+		public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+			this.httpHosts = Preconditions.checkNotNull(httpHosts);
+			this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+		}
+
+		/**
+		 * Sets the maximum number of actions to buffer for each bulk request.
+		 *
+		 * @param numMaxActions the maxinum number of actions to buffer per bulk request.
+		 */
+		public void setBulkFlushMaxActions(int numMaxActions) {
+			Preconditions.checkArgument(
+				numMaxActions > 0,
+				"Max number of buffered actions must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+		}
+
+		/**
+		 * Sets the maximum size of buffered actions, in mb, per bulk request.
+		 *
+		 * @param maxSizeMb the maximum size of buffered actions, in mb.
+		 */
+		public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+			Preconditions.checkArgument(
+				maxSizeMb > 0,
+				"Max size of buffered actions must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+		}
+
+		/**
+		 * Sets the bulk flush interval, in milliseconds.
+		 *
+		 * @param intervalMillis the bulk flush interval, in milliseconds.
+		 */
+		public void setBulkFlushInterval(long intervalMillis) {
+			Preconditions.checkArgument(
+				intervalMillis >= 0,
+				"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+		}
+
+		/**
+		 * Sets whether or not to enable bulk flush backoff behaviour.
+		 *
+		 * @param enabled whether or not to enable backoffs.
+		 */
+		public void setBulkFlushBackoff(boolean enabled) {
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+		}
+
+		/**
+		 * Sets the type of back of to use when flushing bulk requests.
+		 *
+		 * @param flushBackoffType the backoff type to use.
+		 */
+		public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+			this.bulkRequestsConfig.put(
+				CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+				Preconditions.checkNotNull(flushBackoffType).toString());
+		}
+
+		/**
+		 * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+		 *
+		 * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
+		 */
+		public void setBulkFlushBackoffRetries(int maxRetries) {
+			Preconditions.checkArgument(
+				maxRetries > 0,
+				"Max number of backoff attempts must be larger than 0.");
+
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+		}
+
+		/**
+		 * Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+		 *
+		 * @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+		 */
+		public void setBulkFlushBackoffDelay(long delayMillis) {
+			Preconditions.checkArgument(
+				delayMillis >= 0,
+				"Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+		}
+
+		/**
+		 * Sets a failure handler for action requests.
+		 *
+		 * @param failureHandler This is used to handle failed {@link ActionRequest}.
+		 */
+		public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+			this.failureHandler = Preconditions.checkNotNull(failureHandler);
+		}
+
+		/**
+		 * Sets a REST client factory for custom client configuration.
+		 *
+		 * @param restClientFactory the factory that configures the rest client.
+		 */
+		public void setRestClientFactory(RestClientFactory restClientFactory) {
+			this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+		}
 
-		super(new Elasticsearch6ApiCallBridge(httpHosts),  userConfig, elasticsearchSinkFunction, failureHandler);
+		/**
+		 * Creates the Elasticsearch sink.
+		 *
+		 * @return the created Elasticsearch sink.
+		 */
+		public ElasticsearchSink<T> build() {
+			return new ElasticsearchSink<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
new file mode 100644
index 0000000..4b74649
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Serializable;
+
+/**
+ * A factory that is used to configure the {@link org.elasticsearch.client.RestHighLevelClient} internally
+ * used in the {@link ElasticsearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+	/**
+	 * Configures the rest client builder.
+	 *
+	 * @param restClientBuilder the configured rest client builder.
+	 */
+	void configureRestClientBuilder(RestClientBuilder restClientBuilder);
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
index f419b41..8dc6216 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.InternalSettingsPreparer;
 import org.elasticsearch.node.Node;
@@ -44,11 +43,9 @@ public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElastic
 		if (node == null) {
 			Settings settings = Settings.builder()
 				.put("cluster.name", clusterName)
-				.put("http.enabled", false)
+				.put("http.enabled", true)
 				.put("path.home", tmpDataFolder.getParent())
 				.put("path.data", tmpDataFolder.getAbsolutePath())
-				.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
-				.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
 				.build();
 
 			node = new PluginNode(settings);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
index 2170771..a6f0125 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -19,134 +19,82 @@
 package org.apache.flink.streaming.connectors.elasticsearch6;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
-import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 
 import org.apache.http.HttpHost;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.Test;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
  */
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
-
-	/**
-	 * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
-	 */
-	public void runTransportClientTest() throws Exception {
-		final String index = "transport-client-test-index";
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
-			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
-
-		env.execute("Elasticsearch RestHighLevelClient Test");
-
-		// verify the results
-		Client client = embeddedNodeEnv.getClient();
-		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> {
 
-		client.close();
+	@Test
+	public void testElasticsearchSink() throws Exception {
+		runElasticsearchSinkTest();
 	}
 
-	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
-	 */
-	public void runNullTransportClientTest() throws Exception {
-		try {
-			Map<String, String> userConfig = new HashMap<>();
-			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-			createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
-			// test passes
-			return;
-		}
-
-		fail();
+	@Test
+	public void testNullAddresses() throws Exception {
+		runNullAddressesTest();
 	}
 
-	/**
-	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
-	 */
-	public void runEmptyTransportClientTest() throws Exception {
-		try {
-			Map<String, String> userConfig = new HashMap<>();
-			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-			createElasticsearchSink6(userConfig,
-				Collections.<HttpHost>emptyList(),
-				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
-		} catch (IllegalArgumentException expectedException) {
-			// test passes
-			return;
-		}
-
-		fail();
+	@Test
+	public void testEmptyAddresses() throws Exception {
+		runEmptyAddressesTest();
 	}
 
-	/**
-	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
-	 */
-	public void runTransportClientFailsTest() throws Exception {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+	@Test
+	public void testInvalidElasticsearchCluster() throws Exception{
+		runInvalidElasticsearchClusterTest();
+	}
 
-		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
-			new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+	@Override
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSink(
+			int bulkFlushMaxActions,
+			String clusterName,
+			List<HttpHost> httpHosts,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
 
-		try {
-			env.execute("Elasticsearch Transport Client Test");
-		} catch (JobExecutionException expectedException) {
-			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
-			return;
-		}
+		ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+		builder.setBulkFlushMaxActions(bulkFlushMaxActions);
 
-		fail();
+		return builder.build();
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return null;
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+		return createElasticsearchSinkForNode(
+				bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
 	}
 
 	@Override
-	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+	protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForNode(
+			int bulkFlushMaxActions,
+			String clusterName,
+			ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+			String ipAddress) throws Exception {
+
 		ArrayList<HttpHost> httpHosts = new ArrayList<>();
-		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
-	}
+		httpHosts.add(new HttpHost(ipAddress, 9200, "http"));
+
+		ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+		builder.setBulkFlushMaxActions(bulkFlushMaxActions);
 
-	private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
-		Map<String, String> userConfig,
-		List<HttpHost> httpHosts,
-		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+		return builder.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index de1670f..0000000
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch6.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
-
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
-	public static void main(String[] args) throws Exception {
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
-			@Override
-			public String map(Long value) throws Exception {
-				return "message #" + value;
-			}
-		});
-
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-		List<HttpHost> httpHosts = new ArrayList<>();
-		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-
-		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element));
-			}
-		}));
-
-		env.execute("Elasticsearch Sink Example");
-	}
-
-	private static IndexRequest createIndexRequest(String element) {
-		Map<String, Object> json = new HashMap<>();
-		json.put("data", element);
-
-		return Requests.indexRequest()
-			.index("my-index")
-			.type("my-type")
-			.id(element)
-			.source(json);
-	}
-}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
index 2055184..fcd8654 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -22,6 +22,3 @@ log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target=System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index 5544ea5..dedcbb2 100644
--- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 
@@ -62,19 +61,17 @@ public class Elasticsearch6SinkExample {
 				}
 			});
 
-		Map<String, String> userConfig = new HashMap<>();
-		// This instructs the sink to emit after every element, otherwise they would be buffered
-		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
 		List<HttpHost> httpHosts = new ArrayList<>();
 		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 
-		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element, parameterTool));
-			}
-		}));
+		ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+			httpHosts,
+			(String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+
+		// this instructs the sink to emit after every element, otherwise they would be buffered
+		esSinkBuilder.setBulkFlushMaxActions(1);
+
+		source.addSink(esSinkBuilder.build());
 
 		env.execute("Elasticsearch 6.x end to end sink test example");
 	}
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 7b627fe..fa6c331 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -42,15 +42,22 @@ function setup_elasticsearch {
 }
 
 function verify_elasticsearch_process_exist {
-    local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
-
-    # make sure the elasticsearch node is actually running
-    if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
-      echo "Elasticsearch node is not running."
-      exit 1
-    else
-      echo "Elasticsearch node is running."
-    fi
+    for ((i=1;i<=10;i++)); do
+        local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
+
+        echo "Waiting for Elasticsearch node to start ..."
+
+        # make sure the elasticsearch node is actually running
+        if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
+            sleep 1
+        else
+            echo "Elasticsearch node is running."
+            return
+        fi
+    done
+
+    echo "Elasticsearch node did not start properly"
+    exit 1
 }
 
 function verify_result {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 7464409..c8cd2db 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -32,9 +32,6 @@ start_cluster
 
 function test_cleanup {
   shutdown_elasticsearch_cluster index
-
-  # make sure to run regular cleanup as well
-   cleanup
 }
 
 trap test_cleanup INT
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index c4620c8..ae5e58c 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\
 flink-connectors/flink-connector-elasticsearch,\
 flink-connectors/flink-connector-elasticsearch2,\
 flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch6,\
 flink-connectors/flink-connector-elasticsearch-base,\
 flink-connectors/flink-connector-filesystem,\
 flink-connectors/flink-connector-kafka-0.8,\