You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/29 17:26:07 UTC

[flink] branch master updated: [FLINK-26281][connectors/elasticsearch] Remove deprecated 'type', explain exactly-once semantic. This closes #19200

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a35240  [FLINK-26281][connectors/elasticsearch] Remove deprecated 'type', explain exactly-once semantic. This closes #19200
9a35240 is described below

commit 9a3524008c4a2d955ccafb6e2ece39db37c2e3df
Author: Alexander Preuß <11...@users.noreply.github.com>
AuthorDate: Tue Mar 22 14:24:00 2022 +0100

    [FLINK-26281][connectors/elasticsearch] Remove deprecated 'type', explain exactly-once semantic. This closes #19200
---
 .../docs/connectors/datastream/elasticsearch.md    | 47 +++++++++++++++++++---
 .../docs/connectors/datastream/elasticsearch.md    |  8 +++-
 2 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/elasticsearch.md b/docs/content.zh/docs/connectors/datastream/elasticsearch.md
index 5688306..aea933e 100644
--- a/docs/content.zh/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/datastream/elasticsearch.md
@@ -132,7 +132,6 @@ private static IndexRequest createIndexRequest(String element) {
 
     return Requests.indexRequest()
         .index("my-index")
-        .type("my-type")
         .id(element)
         .source(json);
 }
@@ -165,7 +164,7 @@ def createIndexRequest(element: (String)): IndexRequest = {
     "data" -> element.asInstanceOf[AnyRef]
   )
 
-  Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
 }
 ```
 
@@ -220,23 +219,61 @@ def createIndexRequest(element: (String)): IndexRequest = {
 
 {{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
 {{< tab "Java" >}}
+Elasticsearch 6:
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
+
+Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
+```
+
+Elasticsearch 7:
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
+
+Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
+Elasticsearch 6:
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
+
+val sinkBuilder = new Elasticsearch6SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
+```
+
+Elasticsearch 7:
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
+
+val sinkBuilder = new Elasticsearch7SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
 <p style="border-radius: 5px; padding: 5px" class="bg-info">
-<b>重要提示</b>: 默认情况下不启用 checkpoint, 但默认传输保证 AT_LEAST_ONCE 语义。
-这会导致 sink 缓冲请求,直到它结束或 BulkProcessor 自动刷新。
-默认情况下,BulkProcessor 将在 1000 个添加操作后刷新。要将 Processor 配置为更频繁地刷新,请参阅 <a href="#配置内部批量处理器">BulkProcessor 配置部分</a>。
+Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.
 </p>
 
 ### 处理失败的 Elasticsearch 请求
diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md b/docs/content/docs/connectors/datastream/elasticsearch.md
index eb49392..d467bb3 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -134,7 +134,6 @@ private static IndexRequest createIndexRequest(String element) {
 
     return Requests.indexRequest()
         .index("my-index")
-        .type("my-type")
         .id(element)
         .source(json);
 }
@@ -195,7 +194,7 @@ def createIndexRequest(element: (String)): IndexRequest = {
     "data" -> element.asInstanceOf[AnyRef]
   )
 
-  Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
 }
 ```
 {{< /tab >}}
@@ -247,6 +246,11 @@ This causes the sink to buffer requests until it either finishes or the BulkProc
 By default, the BulkProcessor will flush after 1000 added Actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
 </p>
 
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.
+</p>
+
+
 ### Handling Failing Elasticsearch Requests
 
 Elasticsearch action requests may fail due to a variety of reasons, including