You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/06/17 06:57:06 UTC
[flink] branch master updated: [FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 72ef7e01054 [FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API
72ef7e01054 is described below
commit 72ef7e010546f41f8fa7ac01cdb3f9a90f100ac2
Author: Ada Wong <rs...@foxmail.com>
AuthorDate: Fri Dec 31 17:40:19 2021 +0800
[FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API
This closes #19732.
---
.../sink/SimpleElasticsearchEmitter.java | 95 +++++++
flink-python/pyflink/datastream/__init__.py | 2 +
.../pyflink/datastream/connectors/__init__.py | 6 +-
.../pyflink/datastream/connectors/elasticsearch.py | 298 +++++++++++++++++++++
.../pyflink/datastream/tests/test_connectors.py | 81 ++++++
.../datastream/connectors/elasticsearch.py | 140 ++++++++++
6 files changed, 621 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java
new file mode 100644
index 00000000000..8d51a381cd5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Function;
+
+/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+public class SimpleElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {
+
+ private static final long serialVersionUID = 1L;
+ private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+ public SimpleElasticsearchEmitter(
+ String index, String documentType, String idFieldName, boolean isDynamicIndex) {
+ // If this issue resolve https://issues.apache.org/jira/browse/MSHADE-260
+ // we can replace requestGenerator with lambda.
+ // Other corresponding issues https://issues.apache.org/jira/browse/FLINK-18857 and
+ // https://issues.apache.org/jira/browse/FLINK-18006
+ if (isDynamicIndex) {
+ this.requestGenerator =
+ new DynamicIndexRequestGenerator(index, documentType, idFieldName);
+ } else {
+ this.requestGenerator =
+ new StaticIndexRequestGenerator(index, documentType, idFieldName);
+ }
+ }
+
+ public void emit(
+ Map<String, Object> element, SinkWriter.Context context, RequestIndexer indexer) {
+ indexer.add(requestGenerator.apply(element));
+ }
+
+ private static class StaticIndexRequestGenerator
+ implements Function<Map<String, Object>, UpdateRequest>, Serializable {
+ private String index;
+ private String documentType;
+ private String idFieldName;
+
+ public StaticIndexRequestGenerator(String index, String documentType, String idFieldName) {
+ this.index = index;
+ this.documentType = documentType;
+ this.idFieldName = idFieldName;
+ }
+
+ public UpdateRequest apply(Map<String, Object> doc) {
+ return new UpdateRequest(index, documentType, doc.get(idFieldName).toString())
+ .doc(doc)
+ .upsert(doc);
+ }
+ }
+
+ private static class DynamicIndexRequestGenerator
+ implements Function<Map<String, Object>, UpdateRequest>, Serializable {
+ private String index;
+ private String documentType;
+ private String idFieldName;
+
+ public DynamicIndexRequestGenerator(String index, String documentType, String idFieldName) {
+ this.index = index;
+ this.documentType = documentType;
+ this.idFieldName = idFieldName;
+ }
+
+ public UpdateRequest apply(Map<String, Object> doc) {
+ return new UpdateRequest(
+ doc.get(index).toString(),
+ documentType,
+ doc.get(idFieldName).toString())
+ .doc(doc)
+ .upsert(doc);
+ }
+ }
+}
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 053ac8108a1..8e5ea1443f4 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -151,6 +151,8 @@ Classes for state operations:
Classes to define source & sink:
+ - :class:`connectors.elasticsearch.ElasticsearchSink`:
+ A sink for publishing data into Elasticsearch 6 or Elasticsearch 7.
- :class:`connectors.FlinkKafkaConsumer`:
A streaming data source that pulls a parallel data stream from Apache Kafka.
- :class:`connectors.FlinkKafkaProducer`:
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py
index c0da99d2589..c67493398be 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -16,6 +16,8 @@
# limitations under the License.
################################################################################
from pyflink.datastream.connectors.base import Sink, Source, DeliveryGuarantee
+from pyflink.datastream.connectors.elasticsearch import (Elasticsearch6SinkBuilder,
+ Elasticsearch7SinkBuilder)
from pyflink.datastream.connectors.file_system import (FileEnumeratorProvider, FileSink, FileSource,
BucketAssigner, FileSourceBuilder,
FileSplitAssignerProvider, OutputFileConfig,
@@ -69,5 +71,7 @@ __all__ = [
'StreamingFileSink',
'FlinkKinesisConsumer',
'KinesisStreamsSink',
- 'KinesisFirehoseSink'
+ 'KinesisFirehoseSink',
+ 'Elasticsearch6SinkBuilder',
+ 'Elasticsearch7SinkBuilder'
]
diff --git a/flink-python/pyflink/datastream/connectors/elasticsearch.py b/flink-python/pyflink/datastream/connectors/elasticsearch.py
new file mode 100644
index 00000000000..5d9c58c1dc0
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/elasticsearch.py
@@ -0,0 +1,298 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+from enum import Enum
+from typing import List, Union
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray
+
+
+__all__ = ['FlushBackoffType',
+ 'ElasticsearchEmitter',
+ 'Elasticsearch6SinkBuilder',
+ 'Elasticsearch7SinkBuilder',
+ 'ElasticsearchSink']
+
+
+class FlushBackoffType(Enum):
+ """
+ Used to control whether the sink should retry failed requests at all or with which kind back off
+ strategy.
+
+ :data: `CONSTANT`:
+
+ After every failure, it waits a configured time until the retries are exhausted.
+
+ :data: `EXPONENTIAL`:
+
+ After every failure, it waits initially the configured time and increases the waiting time
+ exponentially until the retries are exhausted.
+
+ :data: `NONE`:
+
+ The failure is not retried.
+ """
+
+ CONSTANT = 0,
+ EXPONENTIAL = 1,
+ NONE = 2,
+
+ def _to_j_flush_backoff_type(self):
+ JFlushBackoffType = get_gateway().jvm \
+ .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+ return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchEmitter(object):
+ """
+ Emitter which is used by sinks to prepare elements for sending them to Elasticsearch.
+ """
+
+ def __init__(self, j_emitter):
+ self._j_emitter = j_emitter
+
+ @staticmethod
+ def static_index(index: str, key_field: str = None, doc_type: str = None) \
+ -> 'ElasticsearchEmitter':
+ """
+ Creates an emitter with static index which is invoked on every record to convert it to
+ Elasticsearch actions.
+ """
+ JSimpleElasticsearchEmitter = get_gateway().jvm \
+ .org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
+ j_emitter = JSimpleElasticsearchEmitter(index, doc_type, key_field, False)
+ return ElasticsearchEmitter(j_emitter)
+
+ @staticmethod
+ def dynamic_index(index_field: str, key_field: str = None, doc_type: str = None) \
+ -> 'ElasticsearchEmitter':
+ """
+ Creates an emitter with dynamic index which is invoked on every record to convert it to
+ Elasticsearch actions.
+ """
+ JSimpleElasticsearchEmitter = get_gateway().jvm \
+ .org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
+ j_emitter = JSimpleElasticsearchEmitter(index_field, doc_type, key_field, True)
+ return ElasticsearchEmitter(j_emitter)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+ """
+ Base builder to construct a ElasticsearchSink.
+ """
+
+ @abc.abstractmethod
+ def __init__(self):
+ self._j_elasticsearch_sink_builder = None
+
+ @abc.abstractmethod
+ def get_http_host_class(self):
+ """
+ Gets the org.apache.http.HttpHost class which path is different in different Elasticsearch
+ version.
+ """
+ pass
+
+ def set_emitter(self, emitter: ElasticsearchEmitter) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the emitter which is invoked on every record to convert it to Elasticsearch actions.
+
+ :param emitter: The emitter to process records into Elasticsearch actions.
+ """
+ self._j_elasticsearch_sink_builder.setEmitter(emitter._j_emitter)
+ return self
+
+ def set_hosts(self, hosts: Union[str, List[str]]) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the hosts where the Elasticsearch cluster nodes are reachable.
+ """
+ if not isinstance(hosts, list):
+ hosts = [hosts]
+ JHttpHost = self.get_http_host_class()
+ j_http_hosts_list = [JHttpHost.create(x) for x in hosts]
+ j_http_hosts_array = to_jarray(JHttpHost, j_http_hosts_list)
+ self._j_elasticsearch_sink_builder.setHosts(j_http_hosts_array)
+ return self
+
+ def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) \
+ -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the wanted DeliveryGuarantee. The default delivery guarantee is DeliveryGuarantee#NONE
+ """
+ j_delivery_guarantee = delivery_guarantee._to_j_delivery_guarantee()
+ self._j_elasticsearch_sink_builder.setDeliveryGuarantee(j_delivery_guarantee)
+ return self
+
+ def set_bulk_flush_max_actions(self, num_max_actions: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+ disable it. The default flush size 1000.
+ """
+ self._j_elasticsearch_sink_builder.setBulkFlushMaxActions(num_max_actions)
+ return self
+
+ def set_bulk_flush_max_size_mb(self, max_size_mb: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+ disable it.
+ """
+ self._j_elasticsearch_sink_builder.setBulkFlushMaxSizeMb(max_size_mb)
+ return self
+
+ def set_bulk_flush_interval(self, interval_millis: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+ """
+ self._j_elasticsearch_sink_builder.setBulkFlushInterval(interval_millis)
+ return self
+
+ def set_bulk_flush_backoff_strategy(self,
+ flush_backoff_type: FlushBackoffType,
+ max_retries: int,
+ delay_millis: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the type of back off to use when flushing bulk requests. The default bulk flush back
+ off type is FlushBackoffType#NONE.
+
+ Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+ milliseconds.
+
+ Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+ """
+ self._j_elasticsearch_sink_builder.setBulkFlushBackoffStrategy(
+ flush_backoff_type._to_j_flush_backoff_type(), max_retries, delay_millis)
+ return self
+
+ def set_connection_username(self, username: str) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the username used to authenticate the connection with the Elasticsearch cluster.
+ """
+ self._j_elasticsearch_sink_builder.setConnectionUsername(username)
+ return self
+
+ def set_connection_password(self, password: str) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the password used to authenticate the connection with the Elasticsearch cluster.
+ """
+ self._j_elasticsearch_sink_builder.setConnectionPassword(password)
+ return self
+
+ def set_connection_path_prefix(self, prefix: str) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets a prefix which used for every REST communication to the Elasticsearch cluster.
+ """
+ self._j_elasticsearch_sink_builder.setConnectionPathPrefix(prefix)
+ return self
+
+ def set_connection_request_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the timeout for requesting the connection of the Elasticsearch cluster from the
+ connection manager.
+ """
+ self._j_elasticsearch_sink_builder.setConnectionRequestTimeout(timeout)
+ return self
+
+ def set_connection_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the timeout for establishing a connection of the Elasticsearch cluster.
+ """
+ self._j_elasticsearch_sink_builder.setConnectionTimeout(timeout)
+ return self
+
+ def set_socket_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+ """
+ Sets the timeout for waiting for data or, put differently, a maximum period inactivity
+ between two consecutive data packets.
+ """
+ self._j_elasticsearch_sink_builder.setSocketTimeout(timeout)
+ return self
+
+ def build(self) -> 'ElasticsearchSink':
+ """
+ Constructs the ElasticsearchSink with the properties configured this builder.
+ """
+ return ElasticsearchSink(self._j_elasticsearch_sink_builder.build())
+
+
+class Elasticsearch6SinkBuilder(ElasticsearchSinkBuilderBase):
+ """
+ Builder to construct an Elasticsearch 6 compatible ElasticsearchSink.
+
+ The following example shows the minimal setup to create a ElasticsearchSink that submits
+ actions on checkpoint or the default number of actions was buffered (1000).
+
+ Example:
+ ::
+
+ >>> sink = Elasticsearch6SinkBuilder() \\
+ ... .set_hosts('localhost:9200') \\
+ ... .set_emitter(ElasticsearchEmitter.static_index("user", "key_col")) \\
+ ... .build()
+ """
+
+ def __init__(self):
+ self._j_elasticsearch_sink_builder = get_gateway().jvm \
+ .org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder()
+
+ def get_http_host_class(self):
+ return get_gateway().jvm.org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost
+
+
+class Elasticsearch7SinkBuilder(ElasticsearchSinkBuilderBase):
+ """
+ Builder to construct an Elasticsearch 7 compatible ElasticsearchSink.
+
+ The following example shows the minimal setup to create a ElasticsearchSink that submits
+ actions on checkpoint or the default number of actions was buffered (1000).
+
+ Example:
+ ::
+
+ >>> sink = Elasticsearch7SinkBuilder() \\
+ ... .set_hosts('localhost:9200') \\
+ ... .set_emitter(ElasticsearchEmitter.dynamic_index("index_col", "key_col")) \\
+ ... .build()
+ """
+
+ def __init__(self):
+ self._j_elasticsearch_sink_builder = get_gateway().jvm \
+ .org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder()
+
+ def get_http_host_class(self):
+ return get_gateway().jvm.org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost
+
+
+class ElasticsearchSink(Sink):
+ """
+ Flink Sink to insert or update data in an Elasticsearch index. The sink supports the following
+ delivery guarantees.
+
+ DeliveryGuarantee.NONE does not provide any guarantees: actions are flushed to Elasticsearch
+ only depending on the configurations of the bulk processor. In case of a failure, it might
+ happen that actions are lost if the bulk processor still has buffered actions.
+
+ DeliveryGuarantee.AT_LEAST_ONCE on a checkpoint the sink will wait until all buffered actions
+ are flushed to and acknowledged by Elasticsearch. No actions will be lost but actions might be
+ sent to Elasticsearch multiple times when Flink restarts. These additional requests may cause
+ inconsistent data in ElasticSearch right after the restart, but eventually everything will be
+ consistent again.
+ """
+ def __init__(self, j_elasticsearch_sink):
+ super(ElasticsearchSink, self).__init__(sink=j_elasticsearch_sink)
diff --git a/flink-python/pyflink/datastream/tests/test_connectors.py b/flink-python/pyflink/datastream/tests/test_connectors.py
index 6ff0d64019c..e0a1d2aa671 100644
--- a/flink-python/pyflink/datastream/tests/test_connectors.py
+++ b/flink-python/pyflink/datastream/tests/test_connectors.py
@@ -17,6 +17,9 @@
################################################################################
from abc import ABC, abstractmethod
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
+ FlushBackoffType, ElasticsearchEmitter
+
from pyflink.common import typeinfo, Duration, WatermarkStrategy, ConfigOptions
from pyflink.common.serialization import JsonRowDeserializationSchema, \
JsonRowSerializationSchema, Encoder, SimpleStringSchema
@@ -62,6 +65,84 @@ class ConnectorTestBase(PyFlinkTestCase, ABC):
get_gateway().jvm.Thread.currentThread().setContextClassLoader(self._cxt_clz_loader)
+class FlinkElasticsearch7Test(ConnectorTestBase):
+
+ @classmethod
+ def _get_jars_relative_path(cls):
+ return '/flink-connectors/flink-sql-connector-elasticsearch7'
+
+ def test_es_sink(self):
+ ds = self.env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es_sink = Elasticsearch7SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+ .set_hosts(['localhost:9200']) \
+ .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+ .set_bulk_flush_max_actions(1) \
+ .set_bulk_flush_max_size_mb(2) \
+ .set_bulk_flush_interval(1000) \
+ .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+ .set_connection_username('foo') \
+ .set_connection_password('bar') \
+ .set_connection_path_prefix('foo-bar') \
+ .set_connection_request_timeout(30000) \
+ .set_connection_timeout(31000) \
+ .set_socket_timeout(32000) \
+ .build()
+
+ j_emitter = get_field_value(es_sink.get_java_function(), 'emitter')
+ self.assertTrue(
+ is_instance_of(
+ j_emitter,
+ 'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
+ self.assertEqual(
+ get_field_value(
+ es_sink.get_java_function(), 'hosts')[0].toString(), 'http://localhost:9200')
+ self.assertEqual(
+ get_field_value(
+ es_sink.get_java_function(), 'deliveryGuarantee').toString(), 'at-least-once')
+
+ j_build_bulk_processor_config = get_field_value(
+ es_sink.get_java_function(), 'buildBulkProcessorConfig')
+ self.assertEqual(j_build_bulk_processor_config.getBulkFlushMaxActions(), 1)
+ self.assertEqual(j_build_bulk_processor_config.getBulkFlushMaxMb(), 2)
+ self.assertEqual(j_build_bulk_processor_config.getBulkFlushInterval(), 1000)
+ self.assertEqual(j_build_bulk_processor_config.getFlushBackoffType().toString(), 'CONSTANT')
+ self.assertEqual(j_build_bulk_processor_config.getBulkFlushBackoffRetries(), 3)
+ self.assertEqual(j_build_bulk_processor_config.getBulkFlushBackOffDelay(), 3000)
+
+ j_network_client_config = get_field_value(
+ es_sink.get_java_function(), 'networkClientConfig')
+ self.assertEqual(j_network_client_config.getUsername(), 'foo')
+ self.assertEqual(j_network_client_config.getPassword(), 'bar')
+ self.assertEqual(j_network_client_config.getConnectionRequestTimeout(), 30000)
+ self.assertEqual(j_network_client_config.getConnectionTimeout(), 31000)
+ self.assertEqual(j_network_client_config.getSocketTimeout(), 32000)
+ self.assertEqual(j_network_client_config.getConnectionPathPrefix(), 'foo-bar')
+
+ ds.sink_to(es_sink).name('es sink')
+
+ def test_es_sink_dynamic(self):
+ ds = self.env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es_dynamic_index_sink = Elasticsearch7SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+ .set_hosts(['localhost:9200']) \
+ .build()
+
+ j_emitter = get_field_value(es_dynamic_index_sink.get_java_function(), 'emitter')
+ self.assertTrue(
+ is_instance_of(
+ j_emitter,
+ 'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
+
+ ds.sink_to(es_dynamic_index_sink).name('es dynamic index sink')
+
+
class FlinkKafkaTest(ConnectorTestBase):
@classmethod
diff --git a/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py b/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py
new file mode 100644
index 00000000000..0836b1d18ab
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py
@@ -0,0 +1,140 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import sys
+
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \
+ Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitter
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import DeliveryGuarantee
+
+
+def write_to_es6(env):
+ ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+ 'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
+ env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+ ds = env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es_sink = Elasticsearch6SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+ .set_hosts(['localhost:9200']) \
+ .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+ .set_bulk_flush_max_actions(1) \
+ .set_bulk_flush_max_size_mb(2) \
+ .set_bulk_flush_interval(1000) \
+ .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+ .set_connection_username('foo') \
+ .set_connection_password('bar') \
+ .set_connection_path_prefix('foo-bar') \
+ .set_connection_request_timeout(30000) \
+ .set_connection_timeout(31000) \
+ .set_socket_timeout(32000) \
+ .build()
+
+ ds.sink_to(es_sink).name('es6 sink')
+
+ env.execute()
+
+
+def write_to_es6_dynamic_index(env):
+ ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+ 'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
+ env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+ ds = env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es_sink = Elasticsearch6SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
+ .set_hosts(['localhost:9200']) \
+ .build()
+
+ ds.sink_to(es_sink).name('es6 dynamic index sink')
+
+ env.execute()
+
+
+def write_to_es7(env):
+ ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+ 'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
+ env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+ ds = env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es7_sink = Elasticsearch7SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+ .set_hosts(['localhost:9200']) \
+ .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+ .set_bulk_flush_max_actions(1) \
+ .set_bulk_flush_max_size_mb(2) \
+ .set_bulk_flush_interval(1000) \
+ .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+ .set_connection_username('foo') \
+ .set_connection_password('bar') \
+ .set_connection_path_prefix('foo-bar') \
+ .set_connection_request_timeout(30000) \
+ .set_connection_timeout(31000) \
+ .set_socket_timeout(32000) \
+ .build()
+
+ ds.sink_to(es7_sink).name('es7 sink')
+
+ env.execute()
+
+
+def write_to_es7_dynamic_index(env):
+ ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+ 'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
+ env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+ ds = env.from_collection(
+ [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+ type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+ es7_sink = Elasticsearch7SinkBuilder() \
+ .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+ .set_hosts(['localhost:9200']) \
+ .build()
+
+ ds.sink_to(es7_sink).name('es7 dynamic index sink')
+
+ env.execute()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.set_parallelism(1)
+
+ print("start writing data to elasticsearch6")
+ write_to_es6(env)
+ write_to_es6_dynamic_index(env)
+
+ print("start writing data to elasticsearch7")
+ write_to_es7(env)
+ write_to_es7_dynamic_index(env)