You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/13 11:27:31 UTC

[flink] branch release-1.6 updated: [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary incompatibility

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new f3d6fac  [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary incompatibility
f3d6fac is described below

commit f3d6fac22ff160b53052d384d8d0c231557fcf3e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Sep 12 12:21:34 2018 +0200

    [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary incompatibility
    
    This commit fixes the binary incompatibility for UpdateRequests in Elasticsearch. This
    is due to a binary compatibility issue between the base module (which is compiled
    against a very old ES version and the current Elasticsearch version).
    It lets the API call bridge also provide the RequestIndexer version-specific.
    
    This closes #6682.
---
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 14 ++++
 .../elasticsearch/ElasticsearchSinkBase.java       |  4 +-
 .../PreElasticsearch6BulkProcessorIndexer.java     | 84 +++++++++++++++++++++
 .../Elasticsearch6ApiCallBridge.java               | 13 ++++
 .../Elasticsearch6BulkProcessorIndexer.java        | 85 ++++++++++++++++++++++
 .../streaming/tests/Elasticsearch1SinkExample.java | 42 ++++++++---
 .../streaming/tests/Elasticsearch2SinkExample.java | 43 ++++++++---
 .../streaming/tests/Elasticsearch5SinkExample.java | 42 ++++++++---
 .../streaming/tests/Elasticsearch6SinkExample.java | 35 +++++++--
 .../test-scripts/test_streaming_elasticsearch.sh   |  3 +-
 10 files changed, 319 insertions(+), 46 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index f1dcc83..d3b774c 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions.
@@ -80,6 +81,19 @@ public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Ser
 		@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
 
 	/**
+	 * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary compatible.
+	 */
+	default RequestIndexer createBulkProcessorIndexer(
+			BulkProcessor bulkProcessor,
+			boolean flushOnCheckpoint,
+			AtomicLong numPendingRequestsRef) {
+		return new PreElasticsearch6BulkProcessorIndexer(
+			bulkProcessor,
+			flushOnCheckpoint,
+			numPendingRequestsRef);
+	}
+
+	/**
 	 * Perform any necessary state cleanup.
 	 */
 	default void cleanup() {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7dac06c..4d0c002 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -164,7 +164,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
 	private boolean flushOnCheckpoint = true;
 
 	/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
-	private transient BulkProcessorIndexer requestIndexer;
+	private transient RequestIndexer requestIndexer;
 
 	// ------------------------------------------------------------------------
 	//  Internals for the Flink Elasticsearch Sink
@@ -295,7 +295,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
 	public void open(Configuration parameters) throws Exception {
 		client = callBridge.createClient(userConfig);
 		bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
-		requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
+		requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..85f4b9a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions
+ *             (i.e. the {@link #add(UpdateRequest...)} ). However, this module is currently
+ *             compiled against a very old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+	private final BulkProcessor bulkProcessor;
+	private final boolean flushOnCheckpoint;
+	private final AtomicLong numPendingRequestsRef;
+
+	PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+		this.bulkProcessor = checkNotNull(bulkProcessor);
+		this.flushOnCheckpoint = flushOnCheckpoint;
+		this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+	}
+
+	@Override
+	public void add(DeleteRequest... deleteRequests) {
+		for (DeleteRequest deleteRequest : deleteRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(deleteRequest);
+		}
+	}
+
+	@Override
+	public void add(IndexRequest... indexRequests) {
+		for (IndexRequest indexRequest : indexRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(indexRequest);
+		}
+	}
+
+	@Override
+	public void add(UpdateRequest... updateRequests) {
+		for (UpdateRequest updateRequest : updateRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(updateRequest);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 03bf9c0..782cbbc 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.elasticsearch6;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.http.HttpHost;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
@@ -126,4 +128,15 @@ public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<R
 
 		builder.setBackoffPolicy(backoffPolicy);
 	}
+
+	@Override
+	public RequestIndexer createBulkProcessorIndexer(
+			BulkProcessor bulkProcessor,
+			boolean flushOnCheckpoint,
+			AtomicLong numPendingRequestsRef) {
+		return new Elasticsearch6BulkProcessorIndexer(
+			bulkProcessor,
+			flushOnCheckpoint,
+			numPendingRequestsRef);
+	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..af3c5b1
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+	private final BulkProcessor bulkProcessor;
+	private final boolean flushOnCheckpoint;
+	private final AtomicLong numPendingRequestsRef;
+
+	Elasticsearch6BulkProcessorIndexer(
+			BulkProcessor bulkProcessor,
+			boolean flushOnCheckpoint,
+			AtomicLong numPendingRequestsRef) {
+		this.bulkProcessor = checkNotNull(bulkProcessor);
+		this.flushOnCheckpoint = flushOnCheckpoint;
+		this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+	}
+
+	@Override
+	public void add(DeleteRequest... deleteRequests) {
+		for (DeleteRequest deleteRequest : deleteRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(deleteRequest);
+		}
+	}
+
+	@Override
+	public void add(IndexRequest... indexRequests) {
+		for (IndexRequest indexRequest : indexRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(indexRequest);
+		}
+	}
+
+	@Override
+	public void add(UpdateRequest... updateRequests) {
+		for (UpdateRequest updateRequest : updateRequests) {
+			if (flushOnCheckpoint) {
+				numPendingRequestsRef.getAndIncrement();
+			}
+			this.bulkProcessor.add(updateRequest);
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index 18fa05a..21c53ed 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -17,16 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -56,11 +58,14 @@ public class Elasticsearch1SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
-			.map(new MapFunction<Long, String>() {
+		DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
 				@Override
-				public String map(Long value) throws Exception {
-					return "message # " + value;
+				public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+					final String key = String.valueOf(value);
+					final String message = "message #" + value;
+					out.collect(Tuple2.of(key, message + "update #1"));
+					out.collect(Tuple2.of(key, message + "update #2"));
 				}
 			});
 
@@ -72,12 +77,13 @@ public class Elasticsearch1SinkExample {
 		List<TransportAddress> transports = new ArrayList<>();
 		transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element, parameterTool));
-			}
-		}));
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			(Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+				indexer.add(createIndexRequest(element.f1, parameterTool));
+				indexer.add(createUpdateRequest(element, parameterTool));
+			}));
 
 		env.execute("Elasticsearch1.x end to end sink test example");
 	}
@@ -92,4 +98,16 @@ public class Elasticsearch1SinkExample {
 			.id(element)
 			.source(json);
 	}
+
+	private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element.f1);
+
+		return new UpdateRequest(
+				parameterTool.getRequired("index"),
+				parameterTool.getRequired("type"),
+				element.f0)
+			.doc(json)
+			.upsert(json);
+	}
 }
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index f7532b1a..f8f390e 100644
--- a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -17,15 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.net.InetAddress;
@@ -54,11 +57,14 @@ public class Elasticsearch2SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
-			.map(new MapFunction<Long, String>() {
+		DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
 				@Override
-				public String map(Long value) throws Exception {
-					return "message #" + value;
+				public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+					final String key = String.valueOf(value);
+					final String message = "message #" + value;
+					out.collect(Tuple2.of(key, message + "update #1"));
+					out.collect(Tuple2.of(key, message + "update #2"));
 				}
 			});
 
@@ -70,12 +76,13 @@ public class Elasticsearch2SinkExample {
 		List<InetSocketAddress> transports = new ArrayList<>();
 		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
-			@Override
-			public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element, parameterTool));
-			}
-		}));
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			(Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+				indexer.add(createIndexRequest(element.f1, parameterTool));
+				indexer.add(createUpdateRequest(element, parameterTool));
+			}));
 
 		env.execute("Elasticsearch2.x end to end sink test example");
 	}
@@ -90,4 +97,16 @@ public class Elasticsearch2SinkExample {
 			.id(element)
 			.source(json);
 	}
+
+	private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element.f1);
+
+		return new UpdateRequest(
+				parameterTool.getRequired("index"),
+				parameterTool.getRequired("type"),
+				element.f0)
+			.doc(json)
+			.upsert(json);
+	}
 }
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 39808f6..893d366 100644
--- a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -17,16 +17,18 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.net.InetAddress;
@@ -55,11 +57,14 @@ public class Elasticsearch5SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
-			.map(new MapFunction<Long, String>() {
+		DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
 				@Override
-				public String map(Long value) throws Exception {
-					return "message #" + value;
+				public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+					final String key = String.valueOf(value);
+					final String message = "message #" + value;
+					out.collect(Tuple2.of(key, message + "update #1"));
+					out.collect(Tuple2.of(key, message + "update #2"));
 				}
 			});
 
@@ -71,12 +76,13 @@ public class Elasticsearch5SinkExample {
 		List<InetSocketAddress> transports = new ArrayList<>();
 		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
 
-		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
-			@Override
-			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-				indexer.add(createIndexRequest(element, parameterTool));
-			}
-		}));
+		source.addSink(new ElasticsearchSink<>(
+			userConfig,
+			transports,
+			(Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+				indexer.add(createIndexRequest(element.f1, parameterTool));
+				indexer.add(createUpdateRequest(element, parameterTool));
+			}));
 
 		env.execute("Elasticsearch5.x end to end sink test example");
 	}
@@ -91,4 +97,16 @@ public class Elasticsearch5SinkExample {
 			.id(element)
 			.source(json);
 	}
+
+	private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element.f1);
+
+		return new UpdateRequest(
+				parameterTool.getRequired("index"),
+				parameterTool.getRequired("type"),
+				element.f0)
+			.doc(json)
+			.upsert(json);
+	}
 }
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index dedcbb2..e813c29 100644
--- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -17,16 +17,19 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.util.Collector;
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 
 import java.util.ArrayList;
@@ -53,20 +56,26 @@ public class Elasticsearch6SinkExample {
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(5000);
 
-		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
-			.map(new MapFunction<Long, String>() {
+		DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
 				@Override
-				public String map(Long value) throws Exception {
-					return "message #" + value;
+				public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+					final String key = String.valueOf(value);
+					final String message = "message #" + value;
+					out.collect(Tuple2.of(key, message + "update #1"));
+					out.collect(Tuple2.of(key, message + "update #2"));
 				}
 			});
 
 		List<HttpHost> httpHosts = new ArrayList<>();
 		httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
 
-		ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+		ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
 			httpHosts,
-			(String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+			(Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+				indexer.add(createIndexRequest(element.f1, parameterTool));
+				indexer.add(createUpdateRequest(element, parameterTool));
+			});
 
 		// this instructs the sink to emit after every element, otherwise they would be buffered
 		esSinkBuilder.setBulkFlushMaxActions(1);
@@ -86,4 +95,16 @@ public class Elasticsearch6SinkExample {
 			.id(element)
 			.source(json);
 	}
+
+	private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element.f1);
+
+		return new UpdateRequest(
+				parameterTool.getRequired("index"),
+				parameterTool.getRequired("type"),
+				element.f0)
+			.doc(json)
+			.upsert(json);
+	}
 }
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index c8cd2db..800c4e2 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -45,4 +45,5 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
   --index index \
   --type type
 
-verify_result 20 index
+# 40 index requests and 20 final update requests
+verify_result 60 index