You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/18 08:23:54 UTC

[GitHub] [flink] chenzihao5 commented on a change in pull request #19101: [FLINK-26634][docs-zh] Update Chinese version of Elasticsearch connector docs

chenzihao5 commented on a change in pull request #19101:
URL: https://github.com/apache/flink/pull/19101#discussion_r829780304



##########
File path: docs/content.zh/docs/connectors/datastream/elasticsearch.md
##########
@@ -210,72 +210,35 @@ def createIndexRequest(element: (String)): IndexRequest = {
 
 ### Elasticsearch Sinks 和容错
 
-默认情况下,Flink Elasticsearch Sink 不会提供任何传递健壮性的保障。
-用户可以选择启用 Elasticsearch sink 的 at-least-once 语义。
-
-通过启用 Flink checkpoint,Flink Elasticsearch Sink 可以保证至少一次将操作请求发送到 Elasticsearch 集群。
+通过启用 Flink checkpoint,Flink Elasticsearch Sink 保证至少一次将操作请求发送到 Elasticsearch 集群。
 这是通过在进行 checkpoint 时等待 `BulkProcessor` 中所有挂起的操作请求来实现。
 这有效地保证了在触发 checkpoint 之前所有的请求被 Elasticsearch 成功确认,然后继续处理发送到 sink 的记录。
 
 关于 checkpoint 和容错的更多详细信息,请参见[容错文档]({{< ref "docs/learn-flink/fault_tolerance" >}})。
 
-要使用具有容错特性的 Elasticsearch Sinks,需要配置启用 at-least-once 分发并且在执行环境中启用作业拓扑的 checkpoint:
+要使用具有容错特性的 Elasticsearch Sinks,需要在执行环境中启用作业拓扑的 checkpoint:
 
 {{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
 {{< tab "Java" >}}
-Elasticsearch 6:
-```java
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
-
-Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
-    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-    .setEmitter(
-    (element, context, indexer) -> 
-    indexer.add(createIndexRequest(element)));
-```
-
-Elasticsearch 7:
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
-
-Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
-    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-    .setEmitter(
-    (element, context, indexer) -> 
-    indexer.add(createIndexRequest(element)));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
-Elasticsearch 6:
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
-
-val sinkBuilder = new Elasticsearch6SinkBuilder[String]
-  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
-  indexer.add(createIndexRequest(element)))
-```
-
-Elasticsearch 7:
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
-
-val sinkBuilder = new Elasticsearch7SinkBuilder[String]
-  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
-  indexer.add(createIndexRequest(element)))
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>重要提示</b>: 默认情况下不启用 checkpoint, 但默认传输保证 AT_LEAST_ONCE 语义。
+这会导致接收器缓冲请求直到完成,或 BulkProcessor 自动刷新。

Review comment:
       Thanks for your revisions. Please remove the space after the comma.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org