You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/15 03:39:24 UTC

[flink-connector-elasticsearch] branch main updated: [FLINK-18887][connectors/elasticsearch][docs][docs-zh] Add documentation for Python DataStream API (#24)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new cbeb081  [FLINK-18887][connectors/elasticsearch][docs][docs-zh] Add documentation for Python DataStream API (#24)
cbeb081 is described below

commit cbeb08187b3a1d89b29e86a27be8ddb60778b9c1
Author: Luning (Lucas) Wang <wa...@gmail.com>
AuthorDate: Fri Jul 15 11:39:20 2022 +0800

    [FLINK-18887][connectors/elasticsearch][docs][docs-zh] Add documentation for Python DataStream API (#24)
---
 .../docs/connectors/datastream/elasticsearch.md    | 136 +++++++++++++++++++++
 .../docs/connectors/datastream/elasticsearch.md    | 119 +++++++++++++++++-
 2 files changed, 254 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/connectors/datastream/elasticsearch.md b/docs/content.zh/docs/connectors/datastream/elasticsearch.md
index aea933e..1f7106d 100644
--- a/docs/content.zh/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/datastream/elasticsearch.md
@@ -197,6 +197,82 @@ def createIndexRequest(element: (String)): IndexRequest = {
   Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
 }
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+Elasticsearch 6 静态索引:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
+es6_sink = Elasticsearch6SinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es6_sink).name('es6 sink')
+```
+
+Elasticsearch 6 动态索引:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+es_sink = Elasticsearch6SinkBuilder() \
+    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es6_sink).name('es6 dynamic index sink')
+```
+
+Elasticsearch 7 静态索引:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 sink')
+```
+
+Elasticsearch 7 动态索引:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 dynamic index sink')
+```
+
 {{< /tab >}}
 {{< /tabs >}}
 
@@ -269,6 +345,34 @@ val sinkBuilder = new Elasticsearch7SinkBuilder[String]
   .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
   indexer.add(createIndexRequest(element)))
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+Elasticsearch 6:
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# 每 5000 毫秒执行一次 checkpoint
+env.enable_checkpointing(5000)
+
+sink_builder = Elasticsearch6SinkBuilder() \
+    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+    .set_hosts(['localhost:9200'])
+```
+
+Elasticsearch 7:
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# 每 5000 毫秒执行一次 checkpoint
+env.enable_checkpointing(5000)
+
+sink_builder = Elasticsearch7SinkBuilder() \
+    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+    .set_hosts(['localhost:9200'])
+```
+
 {{< /tab >}}
 {{< /tabs >}}
 
@@ -343,6 +447,38 @@ input.sinkTo(
     .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
     .build())
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+Elasticsearch 6:
+```python
+input = ...
+
+# 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
+es_sink = Elasticsearch6SinkBuilder() \
+    .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 5, 1000) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es_sink).name('es6 sink')
+```
+
+Elasticsearch 7:
+```python
+input = ...
+
+# 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 sink')
+```
+
 {{< /tab >}}
 {{< /tabs >}}
 
diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md b/docs/content/docs/connectors/datastream/elasticsearch.md
index d467bb3..34bdfb3 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -197,6 +197,82 @@ def createIndexRequest(element: (String)): IndexRequest = {
   Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
 }
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+Elasticsearch 6 static index:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
+es6_sink = Elasticsearch6SinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es6_sink).name('es6 sink')
+```
+
+Elasticsearch 6 dynamic index:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+es_sink = Elasticsearch6SinkBuilder() \
+    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es6_sink).name('es6 dynamic index sink')
+```
+
+Elasticsearch 7 static index:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 sink')
+```
+
+Elasticsearch 7 dynamic index:
+```python
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 dynamic index sink')
+```
+
 {{< /tab >}}
 {{< /tabs >}}
 
@@ -237,6 +313,16 @@ Elasticsearch 6:
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // checkpoint every 5000 msecs
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# checkpoint every 5000 msecs
+env.enable_checkpointing(5000)
+```
+
 {{< /tab >}}
 {{< /tabs >}}
 
@@ -250,7 +336,6 @@ By default, the BulkProcessor will flush after 1000 added Actions. To configure
 Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.
 </p>
 
-
 ### Handling Failing Elasticsearch Requests
 
 Elasticsearch action requests may fail due to a variety of reasons, including
@@ -319,6 +404,38 @@ input.sinkTo(
     .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
     .build())
 ```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+Elasticsearch 6:
+```python
+input = ...
+
+# This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+es_sink = Elasticsearch6SinkBuilder() \
+    .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 5, 1000) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es_sink).name('es6 sink')
+```
+
+Elasticsearch 7:
+```python
+input = ...
+
+# This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+es7_sink = Elasticsearch7SinkBuilder() \
+    .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
+    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(es7_sink).name('es7 sink')
+```
+
 {{< /tab >}}
 {{< /tabs >}}