You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/30 08:30:07 UTC

[GitHub] [flink] MartijnVisser commented on a change in pull request #19277: [FLINK-26281][BP 1.15][connectors/elasticsearch] Improve documentation

MartijnVisser commented on a change in pull request #19277:
URL: https://github.com/apache/flink/pull/19277#discussion_r838262122



##########
File path: docs/content.zh/docs/connectors/datastream/elasticsearch.md
##########
@@ -220,23 +219,61 @@ def createIndexRequest(element: (String)): IndexRequest = {
 
 {{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
 {{< tab "Java" >}}
+Elasticsearch 6:
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
+
+Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
+```
+
+Elasticsearch 7:
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint
+
+Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter(
+    (element, context, indexer) -> 
+    indexer.add(createIndexRequest(element)));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
+Elasticsearch 6:
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
+
+val sinkBuilder = new Elasticsearch6SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
+```
+
+Elasticsearch 7:
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint
+
+val sinkBuilder = new Elasticsearch7SinkBuilder[String]
+  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
+  indexer.add(createIndexRequest(element)))
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
 <p style="border-radius: 5px; padding: 5px" class="bg-info">
-<b>重要提示</b>: 默认情况下不启用 checkpoint, 但默认传输保证 AT_LEAST_ONCE 语义。
-这会导致 sink 缓冲请求,直到它结束或 BulkProcessor 自动刷新。
-默认情况下,BulkProcessor 将在 1000 个添加操作后刷新。要将 Processor 配置为更频繁地刷新,请参阅 <a href="#配置内部批量处理器">BulkProcessor 配置部分</a>。
+Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.

Review comment:
       Yes, I'm going to create a follow-up ticket to make sure that the entire Chinese translation gets reviewed and where necessary translated. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org