You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/01 13:17:47 UTC

[flink] 04/06: [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 235dad10f23306879fad0cb1577809c767a26f5e
Author: Christophe Jolif <cj...@gmail.com>
AuthorDate: Thu Jan 25 22:31:57 2018 +0100

    [FLINK-8101] [elasticsearch] Elasticsearch 6.X REST support
---
 docs/dev/connectors/elasticsearch.md               |  55 ++++++-
 .../elasticsearch/ElasticsearchApiCallBridge.java  |   3 +-
 .../elasticsearch/ElasticsearchSinkBase.java       |   3 +-
 .../elasticsearch/ElasticsearchSinkBaseTest.java   |  10 +-
 .../EmbeddedElasticsearchNodeEnvironment.java      |   2 +-
 .../elasticsearch/Elasticsearch1ApiCallBridge.java |   2 +-
 .../flink-connector-elasticsearch6/pom.xml         | 180 +++++++++++++++++++++
 .../Elasticsearch6ApiCallBridge.java               | 110 +++++++++++++
 .../elasticsearch6/ElasticsearchSink.java          |  91 +++++++++++
 .../EmbeddedElasticsearchNodeEnvironmentImpl.java  |  82 ++++++++++
 .../elasticsearch6/ElasticsearchSinkITCase.java    | 152 +++++++++++++++++
 .../examples/ElasticsearchSinkExample.java         |  81 ++++++++++
 .../src/test/resources/log4j-test.properties       |  27 ++++
 flink-connectors/pom.xml                           |   1 +
 14 files changed, 786 insertions(+), 13 deletions(-)

diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 52d1b58..20a7d71 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -55,6 +55,11 @@ of the Elasticsearch installation:
         <td>1.3.0</td>
         <td>5.x</td>
     </tr>
+    <tr>
+        <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td>
+        <td>1.6.0</td>
+        <td>6 and later versions</td>
+    </tr>
   </tbody>
 </table>
 
@@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document actions against your clu
 
 ## Elasticsearch Sink
 
-The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an
 Elasticsearch cluster.
 
 The example below shows how to configure and create a sink:
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
     }
 }));{% endhighlight %}
 </div>
+<div data-lang="java, Elasticsearch 6.x" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+List<HttpHost> httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
+    public IndexRequest createIndexRequest(String element) {
+        Map<String, String> json = new HashMap<>();
+        json.put("data", element);
+    
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+    
+    @Override
+    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+        indexer.add(createIndexRequest(element));
+    }
+}));{% endhighlight %}
+</div>
 <div data-lang="scala, Elasticsearch 1.x" markdown="1">
 {% highlight scala %}
 val input: DataStream[String] = ...
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 </div>
+<div data-lang="scala, Elasticsearch 6.x" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+    val json = new java.util.HashMap[String, String]
+    json.put("data", element)
+    
+    return Requests.indexRequest()
+            .index("my-index")
+            .type("my-type")
+            .source(json)
+  }
+}))
+{% endhighlight %}
+</div>
 </div>
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
 The configuration keys are documented in the Elasticsearch documentation
 [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
 Especially important is the `cluster.name` parameter that must correspond to
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 1c501bf..90d84f3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -41,8 +41,7 @@ import java.util.Map;
 public abstract class ElasticsearchApiCallBridge implements Serializable {
 
 	/**
-	 * Creates an Elasticsearch client implementing {@link AutoCloseable}. This can
-	 * be a {@link org.elasticsearch.client.Client} or {@link org.elasticsearch.client.RestHighLevelClient}
+	 * Creates an Elasticsearch client implementing {@link AutoCloseable}.
 	 *
 	 * @param clientConfig The configuration to use when constructing the client.
 	 * @return The created client.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 0305ee3d..9830484 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -59,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
  * a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
- * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
+ * for example, to create a Elasticsearch {@link Client} or {@RestHighLevelClient}, handle failed item responses, etc.
  *
  * @param <T> Type of the elements handled by this sink
  */
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 5a161a7..460e939 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -540,6 +540,11 @@ public class ElasticsearchSinkBaseTest {
 			return mock(Client.class);
 		}
 
+		@Override
+		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+			return null;
+		}
+
 		@Nullable
 		@Override
 		public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
@@ -551,11 +556,6 @@ public class ElasticsearchSinkBaseTest {
 		}
 
 		@Override
-		public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
-			return null;
-		}
-
-		@Override
 		public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
 			// no need for this in the test cases here
 		}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index ea6e7a3..fd14ba3 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -29,7 +29,7 @@ import java.io.File;
  *       also be located under the same package. The intentional package-private accessibility of this interface
  *       enforces that.
  */
-interface EmbeddedElasticsearchNodeEnvironment {
+public interface EmbeddedElasticsearchNodeEnvironment {
 
 	/**
 	 * Start an embedded Elasticsearch node instance.
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 6f49206..28d5f34 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -85,7 +85,7 @@ public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 				.data(false)
 				.node();
 
-			Client client = node.client();
+			AutoCloseable client = node.client();
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Created Elasticsearch client from embedded node");
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
new file mode 100644
index 0000000..e453837
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.7-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+	<name>flink-connector-elasticsearch6</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>6.3.1</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Dependency for Elasticsearch 6.x REST Client -->
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>elasticsearch-rest-high-level-client</artifactId>
+			<version>${elasticsearch.version}</version>
+		</dependency>
+
+		<!--
+			Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making
+			Log4j2 a strict dependency. The following is added so that the Log4j2 API in
+			Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible
+			in the logging implementation preferred.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-to-slf4j</artifactId>
+			<version>2.7</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.elasticsearch</groupId>
+					<artifactId>elasticsearch</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<!--
+   			 Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
+		-->
+
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>${elasticsearch.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.elasticsearch.plugin</groupId>
+			<artifactId>transport-netty4-client</artifactId>
+			<version>${elasticsearch.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!--
+			Including Log4j2 dependencies for tests is required for the
+			embedded Elasticsearch nodes used in tests to run correctly.
+		-->
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.7</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!--
+				For the tests, we need to exclude the Log4j2 to slf4j adapter dependency
+				and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node
+				used in tests will fail to work.
+
+				In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J,
+				but for the test builds, we still stick to directly using Log4j2.
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.12.2</version>
+				<configuration>
+					<classpathDependencyExcludes>
+						<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+					</classpathDependencyExcludes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 0000000..2cb4ea0
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge extends ElasticsearchApiCallBridge {
+
+	private static final long serialVersionUID = -5222683870097809633L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+	/**
+	 * User-provided HTTP Host.
+	 */
+	private final List<HttpHost> httpHosts;
+
+	Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts) {
+		Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+		this.httpHosts = httpHosts;
+	}
+
+	@Override
+	public AutoCloseable createClient(Map<String, String> clientConfig) {
+		RestHighLevelClient rhlClient =
+			new RestHighLevelClient(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
+		}
+
+		return rhlClient;
+	}
+
+	@Override
+	public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener) {
+		RestHighLevelClient rhlClient = (RestHighLevelClient) client;
+		return BulkProcessor.builder(rhlClient::bulkAsync, listener);
+	}
+
+	@Override
+	public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+		if (!bulkItemResponse.isFailed()) {
+			return null;
+		} else {
+			return bulkItemResponse.getFailure().getCause();
+		}
+	}
+
+	@Override
+	public void configureBulkProcessorBackoff(
+		BulkProcessor.Builder builder,
+		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+		BackoffPolicy backoffPolicy;
+		if (flushBackoffPolicy != null) {
+			switch (flushBackoffPolicy.getBackoffType()) {
+				case CONSTANT:
+					backoffPolicy = BackoffPolicy.constantBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+					break;
+				case EXPONENTIAL:
+				default:
+					backoffPolicy = BackoffPolicy.exponentialBackoff(
+						new TimeValue(flushBackoffPolicy.getDelayMillis()),
+						flushBackoffPolicy.getMaxRetryCount());
+			}
+		} else {
+			backoffPolicy = BackoffPolicy.noBackoff();
+		}
+
+		builder.setBackoffPolicy(backoffPolicy);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 0000000..3f75b5f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
+ *
+ * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 *
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+
+		this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler());
+	}
+
+	/**
+	 * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}.
+	 *
+	 * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+	 * @param failureHandler This is used to handle failed {@link ActionRequest}
+	 * @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to.
+	 */
+	public ElasticsearchSink(
+		Map<String, String> userConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+		ActionRequestFailureHandler failureHandler) {
+
+		super(new Elasticsearch6ApiCallBridge(httpHosts),  userConfig, elasticsearchSinkFunction, failureHandler);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..f419b41
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+	private Node node;
+
+	@Override
+	public void start(File tmpDataFolder, String clusterName) throws Exception {
+		if (node == null) {
+			Settings settings = Settings.builder()
+				.put("cluster.name", clusterName)
+				.put("http.enabled", false)
+				.put("path.home", tmpDataFolder.getParent())
+				.put("path.data", tmpDataFolder.getAbsolutePath())
+				.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+				.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
+				.build();
+
+			node = new PluginNode(settings);
+			node.start();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (node != null && !node.isClosed()) {
+			node.close();
+			node = null;
+		}
+	}
+
+	@Override
+	public Client getClient() {
+		if (node != null && !node.isClosed()) {
+			return node.client();
+		} else {
+			return null;
+		}
+	}
+
+	private static class PluginNode extends Node {
+		public PluginNode(Settings settings) {
+			super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class));
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..2170771
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+	/**
+	 * Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}.
+	 */
+	public void runTransportClientTest() throws Exception {
+		final String index = "transport-client-test-index";
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+		env.execute("Elasticsearch RestHighLevelClient Test");
+
+		// verify the results
+		Client client = embeddedNodeEnv.getClient();
+		SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+		client.close();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}.
+	 */
+	public void runNullTransportClientTest() throws Exception {
+		try {
+			Map<String, String> userConfig = new HashMap<>();
+			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+			createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty.
+	 */
+	public void runEmptyTransportClientTest() throws Exception {
+		try {
+			Map<String, String> userConfig = new HashMap<>();
+			userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+			createElasticsearchSink6(userConfig,
+				Collections.<HttpHost>emptyList(),
+				new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+		} catch (IllegalArgumentException expectedException) {
+			// test passes
+			return;
+		}
+
+		fail();
+	}
+
+	/**
+	 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
+	 */
+	public void runTransportClientFailsTest() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+			new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+		try {
+			env.execute("Elasticsearch Transport Client Test");
+		} catch (JobExecutionException expectedException) {
+			assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
+			return;
+		}
+
+		fail();
+	}
+
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return null;
+	}
+
+	@Override
+	protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+		ArrayList<HttpHost> httpHosts = new ArrayList<>();
+		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+	}
+
+	private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
+		Map<String, String> userConfig,
+		List<HttpHost> httpHosts,
+		ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+		return new ElasticsearchSink<>(userConfig, httpHosts, elasticsearchSinkFunction);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..de1670f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+			@Override
+			public String map(Long value) throws Exception {
+				return "message #" + value;
+			}
+		});
+
+		Map<String, String> userConfig = new HashMap<>();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<HttpHost> httpHosts = new ArrayList<>();
+		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, httpHosts, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element));
+			}
+		}));
+
+		env.execute("Elasticsearch Sink Example");
+	}
+
+	private static IndexRequest createIndexRequest(String element) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index("my-index")
+			.type("my-type")
+			.id(element)
+			.source(json);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2055184
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3afb779..cacea91 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
 		<module>flink-connector-elasticsearch</module>
 		<module>flink-connector-elasticsearch2</module>
 		<module>flink-connector-elasticsearch5</module>
+		<module>flink-connector-elasticsearch6</module>
 		<module>flink-connector-rabbitmq</module>
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>