You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/13 11:27:31 UTC
[flink] branch release-1.6 updated: [FLINK-10269] [connectors] Fix
Elasticsearch 6 UpdateRequest binary incompatibility
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new f3d6fac [FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary incompatibility
f3d6fac is described below
commit f3d6fac22ff160b53052d384d8d0c231557fcf3e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Sep 12 12:21:34 2018 +0200
[FLINK-10269] [connectors] Fix Elasticsearch 6 UpdateRequest binary incompatibility
This commit fixes the binary incompatibility for UpdateRequests in Elasticsearch. This
is due to a binary compatibility issue between the base module (which is compiled
against a very old ES version and the current Elasticsearch version).
It lets the API call bridge also provide the RequestIndexer version-specific.
This closes #6682.
---
.../elasticsearch/ElasticsearchApiCallBridge.java | 14 ++++
.../elasticsearch/ElasticsearchSinkBase.java | 4 +-
.../PreElasticsearch6BulkProcessorIndexer.java | 84 +++++++++++++++++++++
.../Elasticsearch6ApiCallBridge.java | 13 ++++
.../Elasticsearch6BulkProcessorIndexer.java | 85 ++++++++++++++++++++++
.../streaming/tests/Elasticsearch1SinkExample.java | 42 ++++++++---
.../streaming/tests/Elasticsearch2SinkExample.java | 43 ++++++++---
.../streaming/tests/Elasticsearch5SinkExample.java | 42 ++++++++---
.../streaming/tests/Elasticsearch6SinkExample.java | 35 +++++++--
.../test-scripts/test_streaming_elasticsearch.sh | 3 +-
10 files changed, 319 insertions(+), 46 deletions(-)
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index f1dcc83..d3b774c 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions.
@@ -80,6 +81,19 @@ public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Ser
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
/**
+ * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary compatible.
+ */
+ default RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new PreElasticsearch6BulkProcessorIndexer(
+ bulkProcessor,
+ flushOnCheckpoint,
+ numPendingRequestsRef);
+ }
+
+ /**
* Perform any necessary state cleanup.
*/
default void cleanup() {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7dac06c..4d0c002 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -164,7 +164,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
private boolean flushOnCheckpoint = true;
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
- private transient BulkProcessorIndexer requestIndexer;
+ private transient RequestIndexer requestIndexer;
// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
@@ -295,7 +295,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
- requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
+ requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
}
@Override
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..85f4b9a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions
+ * (i.e. the {@link #add(UpdateRequest...)} ). However, this module is currently
+ * compiled against a very old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 03bf9c0..782cbbc 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.elasticsearch6;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.Preconditions;
import org.apache.http.HttpHost;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
@@ -126,4 +128,15 @@ public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<R
builder.setBackoffPolicy(backoffPolicy);
}
+
+ @Override
+ public RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new Elasticsearch6BulkProcessorIndexer(
+ bulkProcessor,
+ flushOnCheckpoint,
+ numPendingRequestsRef);
+ }
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 0000000..af3c5b1
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index 18fa05a..21c53ed 100644
--- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -17,16 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@@ -56,11 +58,14 @@ public class Elasticsearch1SinkExample {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message # " + value;
+ public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+ final String key = String.valueOf(value);
+ final String message = "message #" + value;
+ out.collect(Tuple2.of(key, message + "update #1"));
+ out.collect(Tuple2.of(key, message + "update #2"));
}
});
@@ -72,12 +77,13 @@ public class Elasticsearch1SinkExample {
List<TransportAddress> transports = new ArrayList<>();
transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element, parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1, parameterTool));
+ indexer.add(createUpdateRequest(element, parameterTool));
+ }));
env.execute("Elasticsearch1.x end to end sink test example");
}
@@ -92,4 +98,16 @@ public class Elasticsearch1SinkExample {
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index f7532b1a..f8f390e 100644
--- a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -17,15 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.net.InetAddress;
@@ -54,11 +57,14 @@ public class Elasticsearch2SinkExample {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+ final String key = String.valueOf(value);
+ final String message = "message #" + value;
+ out.collect(Tuple2.of(key, message + "update #1"));
+ out.collect(Tuple2.of(key, message + "update #2"));
}
});
@@ -70,12 +76,13 @@ public class Elasticsearch2SinkExample {
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){
- @Override
- public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
- indexer.add(createIndexRequest(element, parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1, parameterTool));
+ indexer.add(createUpdateRequest(element, parameterTool));
+ }));
env.execute("Elasticsearch2.x end to end sink test example");
}
@@ -90,4 +97,16 @@ public class Elasticsearch2SinkExample {
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 39808f6..893d366 100644
--- a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -17,16 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.net.InetAddress;
@@ -55,11 +57,14 @@ public class Elasticsearch5SinkExample {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+ final String key = String.valueOf(value);
+ final String message = "message #" + value;
+ out.collect(Tuple2.of(key, message + "update #1"));
+ out.collect(Tuple2.of(key, message + "update #2"));
}
});
@@ -71,12 +76,13 @@ public class Elasticsearch5SinkExample {
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element, parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1, parameterTool));
+ indexer.add(createUpdateRequest(element, parameterTool));
+ }));
env.execute("Elasticsearch5.x end to end sink test example");
}
@@ -91,4 +97,16 @@ public class Elasticsearch5SinkExample {
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index dedcbb2..e813c29 100644
--- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -17,16 +17,19 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
@@ -53,20 +56,26 @@ public class Elasticsearch6SinkExample {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
+ final String key = String.valueOf(value);
+ final String message = "message #" + value;
+ out.collect(Tuple2.of(key, message + "update #1"));
+ out.collect(Tuple2.of(key, message + "update #2"));
}
});
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
- ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
+ ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
- (String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+ (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1, parameterTool));
+ indexer.add(createUpdateRequest(element, parameterTool));
+ });
// this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
@@ -86,4 +95,16 @@ public class Elasticsearch6SinkExample {
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index c8cd2db..800c4e2 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -45,4 +45,5 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
--index index \
--type type
-verify_result 20 index
+# 40 index requests and 20 final update requests
+verify_result 60 index