You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/01 13:17:49 UTC
[flink] 06/06: [FLINK-9885] [elasticsearch] Major cleanup to
finalize Elasticsearch 6.x connector
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit abbd6b02d743486f3c0c1336139dd6b3edd20840
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 17:21:34 2018 +0800
[FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch 6.x connector
This closes #6391.
---
docs/dev/connectors/elasticsearch.md | 192 ++++++++++++++++++---
.../elasticsearch/ElasticsearchApiCallBridge.java | 22 ++-
.../elasticsearch/ElasticsearchSinkBase.java | 24 ++-
.../elasticsearch/ElasticsearchSinkBaseTest.java | 8 +-
.../elasticsearch/ElasticsearchSinkTestBase.java | 103 +++++++----
.../elasticsearch/Elasticsearch1ApiCallBridge.java | 10 +-
.../elasticsearch/ElasticsearchSink.java | 3 +-
.../elasticsearch/ElasticsearchSinkITCase.java | 59 +++++--
.../Elasticsearch2ApiCallBridge.java | 9 +-
.../elasticsearch2/ElasticsearchSink.java | 2 +-
.../elasticsearch2/ElasticsearchSinkITCase.java | 60 +++++--
.../Elasticsearch5ApiCallBridge.java | 9 +-
.../elasticsearch5/ElasticsearchSink.java | 2 +-
.../elasticsearch5/ElasticsearchSinkITCase.java | 65 +++++--
.../flink-connector-elasticsearch6/pom.xml | 6 +-
.../Elasticsearch6ApiCallBridge.java | 35 +++-
.../elasticsearch6/ElasticsearchSink.java | 168 +++++++++++++++---
.../elasticsearch6/RestClientFactory.java | 40 +++++
.../EmbeddedElasticsearchNodeEnvironmentImpl.java | 5 +-
.../elasticsearch6/ElasticsearchSinkITCase.java | 144 +++++-----------
.../examples/ElasticsearchSinkExample.java | 81 ---------
.../src/test/resources/log4j-test.properties | 3 -
.../streaming/tests/Elasticsearch6SinkExample.java | 19 +-
.../test-scripts/elasticsearch-common.sh | 25 ++-
.../test-scripts/test_streaming_elasticsearch.sh | 3 -
tools/travis_mvn_watchdog.sh | 1 +
26 files changed, 703 insertions(+), 395 deletions(-)
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 20a7d71..bafe391 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -84,6 +84,23 @@ The example below shows how to configure and create a sink:
<div class="codetabs" markdown="1">
<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
@@ -115,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
</div>
<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
@@ -145,31 +178,83 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
</div>
<div data-lang="java, Elasticsearch 6.x" markdown="1">
{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
DataStream<String> input = ...;
List<HttpHost> httpHost = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
+// use a ElasticsearchSink.Builder to create an ElasticsearchSink
+ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+ httpHosts,
+ new ElasticsearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element) {
+ Map<String, String> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element));
+ }
}
-}));{% endhighlight %}
+);
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1);
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+ restClientBuilder -> {
+ restClientBuilder.setDefaultHeaders(...)
+ restClientBuilder.setMaxRetryTimeoutMillis(...)
+ restClientBuilder.setPathPrefix(...)
+ restClientBuilder.setHttpClientConfigCallback(...)
+ }
+);
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build());
+{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.common.transport.TransportAddress
+
+import java.net.InetAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
@@ -196,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
</div>
<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
@@ -222,33 +323,72 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
</div>
<div data-lang="scala, Elasticsearch 6.x" markdown="1">
{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
+
+import org.apache.http.HttpHost
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.util.ArrayList
+import java.util.List
+
val input: DataStream[String] = ...
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
-input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
- def createIndexRequest(element: String): IndexRequest = {
- val json = new java.util.HashMap[String, String]
- json.put("data", element)
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json)
+val esSinkBuilder = new ElasticsearchSink.Builer[String](
+ httpHosts,
+ new ElasticsearchSinkFunction[String] {
+ def createIndexRequest(element: String): IndexRequest = {
+ val json = new java.util.HashMap[String, String]
+ json.put("data", element)
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json)
+ }
}
-}))
+)
+
+// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1)
+
+// provide a RestClientFactory for custom configuration on the internally created REST client
+builder.setRestClientBuilder(
+ restClientBuilder -> {
+ restClientBuilder.setDefaultHeaders(...)
+ restClientBuilder.setMaxRetryTimeoutMillis(...)
+ restClientBuilder.setPathPrefix(...)
+ restClientBuilder.setHttpClientConfigCallback(...)
+ }
+)
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build)
{% endhighlight %}
</div>
</div>
-Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate
+with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s
+is used to configure the `ElasticsearchSink`. This config map will be directly
+forwarded when creating the internally used `TransportClient`.
The configuration keys are documented in the Elasticsearch documentation
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
Especially important is the `cluster.name` parameter that must correspond to
the name of your cluster.
+For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication.
+By default, the connector uses the default configurations for the REST client. To have custom
+configuration for the REST client, users can provide a `RestClientFactory` implementation when
+setting up the `ElasticsearchClient.Builder` that builds the sink.
+
Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the `ElasticsearchSinkFunction`
can be used to perform multiple requests of different types (ex.,
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 90d84f3..f1dcc83 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkProcessor;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
@@ -36,9 +37,11 @@ import java.util.Map;
* <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
* is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
* exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
*/
@Internal
-public abstract class ElasticsearchApiCallBridge implements Serializable {
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
/**
* Creates an Elasticsearch client implementing {@link AutoCloseable}.
@@ -46,9 +49,16 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
* @param clientConfig The configuration to use when constructing the client.
* @return The created client.
*/
- public abstract AutoCloseable createClient(Map<String, String> clientConfig);
+ C createClient(Map<String, String> clientConfig) throws IOException;
- public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
+ /**
+ * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+ *
+ * @param client the Elasticsearch client.
+ * @param listener the bulk processor listender.
+ * @return the bulk processor builder.
+ */
+ BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
/**
* Extracts the cause of failure of a bulk item action.
@@ -56,7 +66,7 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
* @param bulkItemResponse the bulk item response to extract cause of failure
* @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
*/
- public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+ @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
/**
* Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
@@ -65,14 +75,14 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
* @param builder the {@link BulkProcessor.Builder} to configure.
* @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
*/
- public abstract void configureBulkProcessorBackoff(
+ void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
/**
* Perform any necessary state cleanup.
*/
- public void cleanup() {
+ default void cleanup() {
// nothing to cleanup by default
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9830484..7dac06c 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
@@ -32,6 +33,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@@ -58,12 +60,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
* a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
*
* @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable}
*/
@Internal
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {
private static final long serialVersionUID = -1007596293618451942L;
@@ -84,6 +87,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
/**
* Used to control whether the retry delay should increase exponentially or remain constant.
*/
+ @PublicEvolving
public enum FlushBackoffType {
CONSTANT,
EXPONENTIAL
@@ -134,14 +138,20 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
private final Integer bulkProcessorFlushMaxActions;
private final Integer bulkProcessorFlushMaxSizeMb;
- private final Integer bulkProcessorFlushIntervalMillis;
+ private final Long bulkProcessorFlushIntervalMillis;
private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
// ------------------------------------------------------------------------
// User-facing API and configuration
// ------------------------------------------------------------------------
- /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */
+ /**
+ * The config map that contains configuration for the bulk flushing behaviours.
+ *
+ * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config
+ * map would also contain Elasticsearch-shipped configuration, and therefore this config map
+ * would also be forwarded when creating the Elasticsearch client.
+ */
private final Map<String, String> userConfig;
/** The function that is used to construct multiple {@link ActionRequest ActionRequests} from each incoming element. */
@@ -161,7 +171,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
// ------------------------------------------------------------------------
/** Call bridge for different version-specific. */
- private final ElasticsearchApiCallBridge callBridge;
+ private final ElasticsearchApiCallBridge<C> callBridge;
/**
* Number of pending action requests not yet acknowledged by Elasticsearch.
@@ -175,7 +185,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
private AtomicLong numPendingRequests = new AtomicLong(0);
/** Elasticsearch client created using the call bridge. */
- private transient AutoCloseable client;
+ private transient C client;
/** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
private transient BulkProcessor bulkProcessor;
@@ -236,7 +246,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
}
if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
} else {
bulkProcessorFlushIntervalMillis = null;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 460e939..369d26a 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -411,7 +411,7 @@ public class ElasticsearchSinkBaseTest {
testHarness.close();
}
- private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+ private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
private static final long serialVersionUID = 5051907841570096991L;
@@ -531,17 +531,17 @@ public class ElasticsearchSinkBaseTest {
}
}
- private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge {
+ private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge<Client> {
private static final long serialVersionUID = -4272760730959041699L;
@Override
- public AutoCloseable createClient(Map<String, String> clientConfig) {
+ public Client createClient(Map<String, String> clientConfig) {
return mock(Client.class);
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+ public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
return null;
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index df3779b..819ffba 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -34,19 +33,20 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
+ *
+ * @param <C> Elasticsearch client type
+ * @param <A> The address type to use
*/
-public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
@@ -85,24 +85,21 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
}
/**
- * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
+ * Tests that the Elasticsearch sink works properly.
*/
- public void runTransportClientTest() throws Exception {
- final String index = "transport-client-test-index";
+ public void runElasticsearchSinkTest() throws Exception {
+ final String index = "elasticsearch-sink-test-index";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", CLUSTER_NAME);
-
source.addSink(createElasticsearchSinkForEmbeddedNode(
- userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+ 1,
+ CLUSTER_NAME,
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
- env.execute("Elasticsearch TransportClient Test");
+ env.execute("Elasticsearch Sink Test");
// verify the results
Client client = embeddedNodeEnv.getClient();
@@ -112,16 +109,20 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
}
/**
- * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+ * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is {@code null}.
*/
- public void runNullTransportClientTest() throws Exception {
+ public void runNullAddressesTest() throws Exception {
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", CLUSTER_NAME);
try {
- createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
- } catch (IllegalArgumentException expectedException) {
+ createElasticsearchSink(
+ 1,
+ CLUSTER_NAME,
+ null,
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException | NullPointerException expectedException) {
// test passes
return;
}
@@ -130,18 +131,19 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
}
/**
- * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+ * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is empty.
*/
- public void runEmptyTransportClientTest() throws Exception {
+ public void runEmptyAddressesTest() throws Exception {
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", CLUSTER_NAME);
try {
createElasticsearchSink(
- userConfig,
- Collections.<InetSocketAddress>emptyList(),
- new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ 1,
+ CLUSTER_NAME,
+ Collections.emptyList(),
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
} catch (IllegalArgumentException expectedException) {
// test passes
return;
@@ -153,39 +155,66 @@ public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
/**
* Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
*/
- public void runTransportClientFailsTest() throws Exception {
+ public void runInvalidElasticsearchClusterTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", "invalid-cluster-name");
- source.addSink(createElasticsearchSinkForEmbeddedNode(
- Collections.unmodifiableMap(userConfig), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+ source.addSink(createElasticsearchSinkForNode(
+ 1,
+ "invalid-cluster-name",
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
+ "123.123.123.123")); // incorrect ip address
try {
- env.execute("Elasticsearch Transport Client Test");
+ env.execute("Elasticsearch Sink Test");
} catch (JobExecutionException expectedException) {
- assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+ // test passes
return;
}
fail();
}
+ /**
+ * Utility method to create a user config map.
+ */
+ protected Map<String, String> createUserConfig(int bulkFlushMaxActions, String clusterName) {
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", clusterName);
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(bulkFlushMaxActions));
+
+ return userConfig;
+ }
+
/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */
- protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<A> addresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction);
/**
* Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
*
- * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
+ * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List, ElasticsearchSinkFunction)}
* because the Elasticsearch Java API to do so is incompatible across different versions.
*/
- protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception;
+
+ /**
+ * Creates a version-specific Elasticsearch sink to connect to a specific Elasticsearch node.
+ */
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+ String ipAddress) throws Exception;
}
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 28d5f34..4f1cd08 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
*/
@Internal
-public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge<Client> {
private static final long serialVersionUID = -2632363720584123682L;
@@ -70,7 +70,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public AutoCloseable createClient(Map<String, String> clientConfig) {
+ public Client createClient(Map<String, String> clientConfig) {
if (transportAddresses == null) {
// Make sure that we disable http access to our embedded node
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
.data(false)
.node();
- AutoCloseable client = node.client();
+ Client client = node.client();
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch client from embedded node");
@@ -116,8 +116,8 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
- return BulkProcessor.builder((Client) client, listener);
+ public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
}
@Override
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index e8eccd9..d5e1d1f 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
@@ -64,7 +65,7 @@ import java.util.Map;
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 5489290..2f1a65c 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -28,10 +28,12 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUti
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.Test;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,26 +44,26 @@ import java.util.Map;
/**
* IT Cases for the {@link ElasticsearchSink}.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client, InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception{
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
// -- Tests specific to Elasticsearch 1.x --
@@ -102,19 +104,28 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
+ elasticsearchSinkFunction);
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+ Map<String, String> userConfig = createUserConfig(bulkFlushMaxActions, clusterName);
// Elasticsearch 1.x requires this setting when using
// LocalTransportAddress to connect to a local embedded node
- userConfig = new HashMap<>(userConfig);
userConfig.put("node.local", "true");
List<TransportAddress> transports = new ArrayList<>();
@@ -126,6 +137,22 @@ public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
elasticsearchSinkFunction);
}
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
+
+ List<TransportAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300));
+
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
+ }
+
/**
* A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
*/
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 80c1b3a..73a69eb 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@@ -44,7 +43,7 @@ import java.util.Map;
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
*/
@Internal
-public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
private static final long serialVersionUID = 2638252694744361079L;
@@ -63,7 +62,7 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public AutoCloseable createClient(Map<String, String> clientConfig) {
+ public TransportClient createClient(Map<String, String> clientConfig) {
Settings settings = Settings.settingsBuilder().put(clientConfig).build();
TransportClient transportClient = TransportClient.builder().settings(settings).build();
@@ -84,8 +83,8 @@ public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
- return BulkProcessor.builder((Client) client, listener);
+ public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
}
@Override
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index ffccacf..a911905 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -58,7 +58,7 @@ import java.util.Map;
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 7ded893..7887e72 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -17,57 +17,81 @@
package org.apache.flink.streaming.connectors.elasticsearch2;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.elasticsearch.client.transport.TransportClient;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* IT cases for the {@link ElasticsearchSink}.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception{
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transportAddresses,
+ elasticsearchSinkFunction);
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+ transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
- return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
}
}
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 1e73feb..a3453ec 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@@ -47,7 +46,7 @@ import java.util.Map;
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
*/
@Internal
-public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
+public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
private static final long serialVersionUID = -5222683870097809633L;
@@ -66,7 +65,7 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public AutoCloseable createClient(Map<String, String> clientConfig) {
+ public TransportClient createClient(Map<String, String> clientConfig) {
Settings settings = Settings.builder().put(clientConfig)
.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -90,8 +89,8 @@ public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
- return BulkProcessor.builder((Client) client, listener);
+ public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
}
@Override
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 6c09337..b99b353 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -59,7 +59,7 @@ import java.util.Map;
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index ad7c664..67daa40 100644
--- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -18,58 +18,85 @@
package org.apache.flink.streaming.connectors.elasticsearch5;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.elasticsearch.client.transport.TransportClient;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception {
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
- List<InetSocketAddress> transportAddresses,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> addresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ addresses,
+ elasticsearchSinkFunction);
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+ transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
- return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
+ return new ElasticsearchSink<>(
+ Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
}
-
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index e453837..ef06d80 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -81,7 +81,7 @@ under the License.
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
- <version>2.7</version>
+ <version>2.9.1</version>
</dependency>
<!-- test dependencies -->
@@ -141,14 +141,14 @@ under the License.
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
- <version>2.7</version>
+ <version>2.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
- <version>2.7</version>
+ <version>2.9.1</version>
<scope>test</scope>
</dependency>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 2cb4ea0..03bf9c0 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.elasticsearch6;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.util.Preconditions;
@@ -26,6 +27,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
@@ -33,13 +35,15 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
*/
-public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+@Internal
+public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> {
private static final long serialVersionUID = -5222683870097809633L;
@@ -50,15 +54,31 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
*/
private final List<HttpHost> httpHosts;
- Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+ /**
+ * The factory to configure the rest client.
+ */
+ private final RestClientFactory restClientFactory;
+
+ Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
this.httpHosts = httpHosts;
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
}
@Override
- public AutoCloseable createClient(Map<String, String> clientConfig) {
- RestHighLevelClient rhlClient =
- new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+ public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
+ RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+
+ RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+ }
+
+ if (!rhlClient.ping()) {
+ throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+ }
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
@@ -68,9 +88,8 @@ public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
- RestHighLevelClient rhlClient = (RestHighLevelClient) client;
- return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+ public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client::bulkAsync, listener);
}
@Override
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 3f75b5f..4e7a263 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -17,17 +17,19 @@
package org.apache.flink.streaming.connectors.elasticsearch6;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.RestHighLevelClient;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,10 +40,6 @@ import java.util.Map;
* <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
* The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
*
- * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
- * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
- * which should be set to the name of the cluster that the sink should emit to.
- *
* <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
* This will buffer elements before sending a request to the cluster. The behaviour of the
* {@code BulkProcessor} can be configured using these config keys:
@@ -58,34 +56,156 @@ import java.util.Map;
*
* @param <T> Type of the elements handled by this sink
*/
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
private static final long serialVersionUID = 1L;
- /**
- * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
- *
- * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
- * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
- */
- public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ private ElasticsearchSink(
+ Map<String, String> bulkRequestsConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ RestClientFactory restClientFactory) {
- this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+ super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}
/**
- * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+ * A builder for creating an {@link ElasticsearchSink}.
*
- * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
- * @param failureHandler This is used to handle failed {@link ActionRequest}
- * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+ * @param <T> Type of the elements handled by the sink this builder creates.
*/
- public ElasticsearchSink(
- Map<String, String> userConfig,
- List<HttpHost> httpHosts,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
- ActionRequestFailureHandler failureHandler) {
+ @PublicEvolving
+ public static class Builder<T> {
+
+ private final List<HttpHost> httpHosts;
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ private Map<String, String> bulkRequestsConfig = new HashMap<>();
+ private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+ private RestClientFactory restClientFactory = restClientBuilder -> {};
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+ *
+ * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} connects to.
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element.
+ */
+ public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+ }
+
+ /**
+ * Sets the maximum number of actions to buffer for each bulk request.
+ *
+ * @param numMaxActions the maxinum number of actions to buffer per bulk request.
+ */
+ public void setBulkFlushMaxActions(int numMaxActions) {
+ Preconditions.checkArgument(
+ numMaxActions > 0,
+ "Max number of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+ }
+
+ /**
+ * Sets the maximum size of buffered actions, in mb, per bulk request.
+ *
+ * @param maxSizeMb the maximum size of buffered actions, in mb.
+ */
+ public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+ Preconditions.checkArgument(
+ maxSizeMb > 0,
+ "Max size of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+ }
+
+ /**
+ * Sets the bulk flush interval, in milliseconds.
+ *
+ * @param intervalMillis the bulk flush interval, in milliseconds.
+ */
+ public void setBulkFlushInterval(long intervalMillis) {
+ Preconditions.checkArgument(
+ intervalMillis >= 0,
+ "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+ }
+
+ /**
+ * Sets whether or not to enable bulk flush backoff behaviour.
+ *
+ * @param enabled whether or not to enable backoffs.
+ */
+ public void setBulkFlushBackoff(boolean enabled) {
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+ }
+
+ /**
+ * Sets the type of back of to use when flushing bulk requests.
+ *
+ * @param flushBackoffType the backoff type to use.
+ */
+ public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+ Preconditions.checkNotNull(flushBackoffType).toString());
+ }
+
+ /**
+ * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+ *
+ * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
+ */
+ public void setBulkFlushBackoffRetries(int maxRetries) {
+ Preconditions.checkArgument(
+ maxRetries > 0,
+ "Max number of backoff attempts must be larger than 0.");
+
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+ }
+
+ /**
+ * Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+ *
+ * @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
+ */
+ public void setBulkFlushBackoffDelay(long delayMillis) {
+ Preconditions.checkArgument(
+ delayMillis >= 0,
+ "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+ this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+ }
+
+ /**
+ * Sets a failure handler for action requests.
+ *
+ * @param failureHandler This is used to handle failed {@link ActionRequest}.
+ */
+ public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+ this.failureHandler = Preconditions.checkNotNull(failureHandler);
+ }
+
+ /**
+ * Sets a REST client factory for custom client configuration.
+ *
+ * @param restClientFactory the factory that configures the rest client.
+ */
+ public void setRestClientFactory(RestClientFactory restClientFactory) {
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
- super(new Elasticsearch6ApiCallBridge(httpHosts), userConfig, elasticsearchSinkFunction, failureHandler);
+ /**
+ * Creates the Elasticsearch sink.
+ *
+ * @return the created Elasticsearch sink.
+ */
+ public ElasticsearchSink<T> build() {
+ return new ElasticsearchSink<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
+ }
}
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
new file mode 100644
index 0000000..4b74649
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Serializable;
+
+/**
+ * A factory that is used to configure the {@link org.elasticsearch.client.RestHighLevelClient} internally
+ * used in the {@link ElasticsearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+ /**
+ * Configures the rest client builder.
+ *
+ * @param restClientBuilder the configured rest client builder.
+ */
+ void configureRestClientBuilder(RestClientBuilder restClientBuilder);
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
index f419b41..8dc6216 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
import org.elasticsearch.client.Client;
-import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
@@ -44,11 +43,9 @@ public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElastic
if (node == null) {
Settings settings = Settings.builder()
.put("cluster.name", clusterName)
- .put("http.enabled", false)
+ .put("http.enabled", true)
.put("path.home", tmpDataFolder.getParent())
.put("path.data", tmpDataFolder.getAbsolutePath())
- .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
- .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.build();
node = new PluginNode(settings);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
index 2170771..a6f0125 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -19,134 +19,82 @@
package org.apache.flink.streaming.connectors.elasticsearch6;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
-import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
import org.apache.http.HttpHost;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.Test;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded
+ * node used in the tests to work properly.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
-
- /**
- * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
- */
- public void runTransportClientTest() throws Exception {
- final String index = "transport-client-test-index";
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
- new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
-
- env.execute("Elasticsearch RestHighLevelClient Test");
-
- // verify the results
- Client client = embeddedNodeEnv.getClient();
- SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> {
- client.close();
+ @Test
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
- /**
- * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
- */
- public void runNullTransportClientTest() throws Exception {
- try {
- Map<String, String> userConfig = new HashMap<>();
- userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
- } catch (IllegalArgumentException expectedException) {
- // test passes
- return;
- }
-
- fail();
+ @Test
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
- /**
- * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
- */
- public void runEmptyTransportClientTest() throws Exception {
- try {
- Map<String, String> userConfig = new HashMap<>();
- userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- createElasticsearchSink6(userConfig,
- Collections.<HttpHost>emptyList(),
- new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
- } catch (IllegalArgumentException expectedException) {
- // test passes
- return;
- }
-
- fail();
+ @Test
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
- /**
- * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
- */
- public void runTransportClientFailsTest() throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
-
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ @Test
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
+ }
- source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
- new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) {
- try {
- env.execute("Elasticsearch Transport Client Test");
- } catch (JobExecutionException expectedException) {
- assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
- return;
- }
+ ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+ builder.setBulkFlushMaxActions(bulkFlushMaxActions);
- fail();
+ return builder.build();
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- return null;
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1");
}
@Override
- protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
+
ArrayList<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
- return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
- }
+ httpHosts.add(new HttpHost(ipAddress, 9200, "http"));
+
+ ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+ builder.setBulkFlushMaxActions(bulkFlushMaxActions);
- private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
- Map<String, String> userConfig,
- List<HttpHost> httpHosts,
- ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+ return builder.build();
}
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
deleted file mode 100644
index de1670f..0000000
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch6.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
-
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchSinkExample {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
- @Override
- public String map(Long value) throws Exception {
- return "message #" + value;
- }
- });
-
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-
- source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }));
-
- env.execute("Elasticsearch Sink Example");
- }
-
- private static IndexRequest createIndexRequest(String element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element)
- .source(json);
- }
-}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
index 2055184..fcd8654 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -22,6 +22,3 @@ log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target=System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index 5544ea5..dedcbb2 100644
--- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
@@ -62,19 +61,17 @@ public class Elasticsearch6SinkExample {
}
});
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
- source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element, parameterTool));
- }
- }));
+ ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+ httpHosts,
+ (String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+
+ // this instructs the sink to emit after every element, otherwise they would be buffered
+ esSinkBuilder.setBulkFlushMaxActions(1);
+
+ source.addSink(esSinkBuilder.build());
env.execute("Elasticsearch 6.x end to end sink test example");
}
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 7b627fe..fa6c331 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -42,15 +42,22 @@ function setup_elasticsearch {
}
function verify_elasticsearch_process_exist {
- local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
-
- # make sure the elasticsearch node is actually running
- if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
- echo "Elasticsearch node is not running."
- exit 1
- else
- echo "Elasticsearch node is running."
- fi
+ for ((i=1;i<=10;i++)); do
+ local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
+
+ echo "Waiting for Elasticsearch node to start ..."
+
+ # make sure the elasticsearch node is actually running
+ if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
+ sleep 1
+ else
+ echo "Elasticsearch node is running."
+ return
+ fi
+ done
+
+ echo "Elasticsearch node did not start properly"
+ exit 1
}
function verify_result {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 7464409..c8cd2db 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -32,9 +32,6 @@ start_cluster
function test_cleanup {
shutdown_elasticsearch_cluster index
-
- # make sure to run regular cleanup as well
- cleanup
}
trap test_cleanup INT
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index c4620c8..ae5e58c 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\
flink-connectors/flink-connector-elasticsearch,\
flink-connectors/flink-connector-elasticsearch2,\
flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch6,\
flink-connectors/flink-connector-elasticsearch-base,\
flink-connectors/flink-connector-filesystem,\
flink-connectors/flink-connector-kafka-0.8,\