You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/01 13:17:47 UTC
[flink] 04/06: [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST
support
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 235dad10f23306879fad0cb1577809c767a26f5e
Author: Christophe Jolif <cj...@gmail.com>
AuthorDate: Thu Jan 25 22:31:57 2018 +0100
[FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support
---
docs/dev/connectors/elasticsearch.md | 55 ++++++-
.../elasticsearch/ElasticsearchApiCallBridge.java | 3 +-
.../elasticsearch/ElasticsearchSinkBase.java | 3 +-
.../elasticsearch/ElasticsearchSinkBaseTest.java | 10 +-
.../EmbeddedElasticsearchNodeEnvironment.java | 2 +-
.../elasticsearch/Elasticsearch1ApiCallBridge.java | 2 +-
.../flink-connector-elasticsearch6/pom.xml | 180 +++++++++++++++++++++
.../Elasticsearch6ApiCallBridge.java | 110 +++++++++++++
.../elasticsearch6/ElasticsearchSink.java | 91 +++++++++++
.../EmbeddedElasticsearchNodeEnvironmentImpl.java | 82 ++++++++++
.../elasticsearch6/ElasticsearchSinkITCase.java | 152 +++++++++++++++++
.../examples/ElasticsearchSinkExample.java | 81 ++++++++++
.../src/test/resources/log4j-test.properties | 27 ++++
flink-connectors/pom.xml | 1 +
14 files changed, 786 insertions(+), 13 deletions(-)
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 52d1b58..20a7d71 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -55,6 +55,11 @@ of the Elasticsearch installation:
<td>1.3.0</td>
<td>5.x</td>
</tr>
+ <tr>
+ <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td>
+ <td>1.6.0</td>
+ <td>6 and later versions</td>
+ </tr>
</tbody>
</table>
@@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document actions against your clu
## Elasticsearch Sink
-The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an
Elasticsearch cluster.
The example below shows how to configure and create a sink:
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
}
}));{% endhighlight %}
</div>
+<div data-lang="java, Elasticsearch 6.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+List<HttpHost> httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element) {
+ Map<String, String> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+}));{% endhighlight %}
+</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
}))
{% endhighlight %}
</div>
+<div data-lang="scala, Elasticsearch 6.x" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
+ def createIndexRequest(element: String): IndexRequest = {
+ val json = new java.util.HashMap[String, String]
+ json.put("data", element)
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json)
+ }
+}))
+{% endhighlight %}
+</div>
</div>
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
The configuration keys are documented in the Elasticsearch documentation
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
Especially important is the `cluster.name` parameter that must correspond to
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 1c501bf..90d84f3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -41,8 +41,7 @@ import java.util.Map;
public abstract class ElasticsearchApiCallBridge implements Serializable {
/**
- * Creates an Elasticsearch client implementing {@link AutoCloseable}. This can
- * be a {@link org.elasticsearch.client.Client} or {@link org.elasticsearch.client.RestHighLevelClient}
+ * Creates an Elasticsearch client implementing {@link AutoCloseable}.
*
* @param clientConfig The configuration to use when constructing the client.
* @return The created client.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 0305ee3d..9830484 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@@ -59,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
* a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
*
* @param <T> Type of the elements handled by this sink
*/
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 5a161a7..460e939 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -540,6 +540,11 @@ public class ElasticsearchSinkBaseTest {
return mock(Client.class);
}
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+ return null;
+ }
+
@Nullable
@Override
public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
@@ -551,11 +556,6 @@ public class ElasticsearchSinkBaseTest {
}
@Override
- public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
- return null;
- }
-
- @Override
public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
// no need for this in the test cases here
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index ea6e7a3..fd14ba3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -29,7 +29,7 @@ import java.io.File;
* also be located under the same package. The intentional package-private accessibility of this interface
* enforces that.
*/
-interface EmbeddedElasticsearchNodeEnvironment {
+public interface EmbeddedElasticsearchNodeEnvironment {
/**
* Start an embedded Elasticsearch node instance.
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 6f49206..28d5f34 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
.data(false)
.node();
- Client client = node.client();
+ AutoCloseable client = node.client();
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch client from embedded node");
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
new file mode 100644
index 0000000..e453837
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+ <name>flink-connector-elasticsearch6</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <elasticsearch.version>6.3.1</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Dependency for Elasticsearch 6.x REST Client -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <!--
+ Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+ Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+ Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+ in the logging implementation preferred.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <version>2.7</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
+ -->
+
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including Log4j2 dependencies for tests is required for the
+ embedded Elasticsearch nodes used in tests to run correctly.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>2.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!--
+ For the tests, we need to exclude the Log4j2 to slf4j adapter dependency
+ and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node
+ used in tests will fail to work.
+
+ In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J,
+ but for the test builds, we still stick to directly using Log4j2.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.2</version>
+ <configuration>
+ <classpathDependencyExcludes>
+ <classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+ </classpathDependencyExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 0000000..2cb4ea0
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+
+ private static final long serialVersionUID = -5222683870097809633L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+ /**
+ * User-provided HTTP Host.
+ */
+ private final List<HttpHost> httpHosts;
+
+ Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+ Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+ this.httpHosts = httpHosts;
+ }
+
+ @Override
+ public AutoCloseable createClient(Map<String, String> clientConfig) {
+ RestHighLevelClient rhlClient =
+ new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
+ }
+
+ return rhlClient;
+ }
+
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+ RestHighLevelClient rhlClient = (RestHighLevelClient) client;
+ return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy = BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy = BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 0000000..3f75b5f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
+ *
+ * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+ *
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+ */
+ public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+ this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+ }
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+ *
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ * @param failureHandler This is used to handle failed {@link ActionRequest}
+ * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+ */
+ public ElasticsearchSink(
+ Map<String, String> userConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ super(new Elasticsearch6ApiCallBridge(httpHosts), userConfig, elasticsearchSinkFunction, failureHandler);
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..f419b41
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+ private Node node;
+
+ @Override
+ public void start(File tmpDataFolder, String clusterName) throws Exception {
+ if (node == null) {
+ Settings settings = Settings.builder()
+ .put("cluster.name", clusterName)
+ .put("http.enabled", false)
+ .put("path.home", tmpDataFolder.getParent())
+ .put("path.data", tmpDataFolder.getAbsolutePath())
+ .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+ .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
+ .build();
+
+ node = new PluginNode(settings);
+ node.start();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (node != null && !node.isClosed()) {
+ node.close();
+ node = null;
+ }
+ }
+
+ @Override
+ public Client getClient() {
+ if (node != null && !node.isClosed()) {
+ return node.client();
+ } else {
+ return null;
+ }
+ }
+
+ private static class PluginNode extends Node {
+ public PluginNode(Settings settings) {
+ super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class));
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..2170771
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+ /**
+ * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
+ */
+ public void runTransportClientTest() throws Exception {
+ final String index = "transport-client-test-index";
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+ Map<String, String> userConfig = new HashMap<>();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+ env.execute("Elasticsearch RestHighLevelClient Test");
+
+ // verify the results
+ Client client = embeddedNodeEnv.getClient();
+ SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+ client.close();
+ }
+
+ /**
+ * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+ */
+ public void runNullTransportClientTest() throws Exception {
+ try {
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException expectedException) {
+ // test passes
+ return;
+ }
+
+ fail();
+ }
+
+ /**
+ * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+ */
+ public void runEmptyTransportClientTest() throws Exception {
+ try {
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ createElasticsearchSink6(userConfig,
+ Collections.<HttpHost>emptyList(),
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException expectedException) {
+ // test passes
+ return;
+ }
+
+ fail();
+ }
+
+ /**
+ * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
+ */
+ public void runTransportClientFailsTest() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+ Map<String, String> userConfig = new HashMap<>();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+ try {
+ env.execute("Elasticsearch Transport Client Test");
+ } catch (JobExecutionException expectedException) {
+ assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+ return;
+ }
+
+ fail();
+ }
+
+ @Override
+ protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ return null;
+ }
+
+ @Override
+ protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ ArrayList<HttpHost> httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+ return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+ }
+
+ private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
+ Map<String, String> userConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..de1670f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+ public static void main(String[] args) throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<HttpHost> httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+ }));
+
+ env.execute("Elasticsearch Sink Example");
+ }
+
+ private static IndexRequest createIndexRequest(String element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .id(element)
+ .source(json);
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3afb779..cacea91 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
<module>flink-connector-elasticsearch</module>
<module>flink-connector-elasticsearch2</module>
<module>flink-connector-elasticsearch5</module>
+ <module>flink-connector-elasticsearch6</module>
<module>flink-connector-rabbitmq</module>
<module>flink-connector-twitter</module>
<module>flink-connector-nifi</module>