You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/06/17 06:57:06 UTC

[flink] branch master updated: [FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ef7e01054 [FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API
72ef7e01054 is described below

commit 72ef7e010546f41f8fa7ac01cdb3f9a90f100ac2
Author: Ada Wong <rs...@foxmail.com>
AuthorDate: Fri Dec 31 17:40:19 2021 +0800

    [FLINK-18887][python][connector/elasticsearch] Add ElasticSearch connector for Python DataStream API
    
    This closes #19732.
---
 .../sink/SimpleElasticsearchEmitter.java           |  95 +++++++
 flink-python/pyflink/datastream/__init__.py        |   2 +
 .../pyflink/datastream/connectors/__init__.py      |   6 +-
 .../pyflink/datastream/connectors/elasticsearch.py | 298 +++++++++++++++++++++
 .../pyflink/datastream/tests/test_connectors.py    |  81 ++++++
 .../datastream/connectors/elasticsearch.py         | 140 ++++++++++
 6 files changed, 621 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java
new file mode 100644
index 00000000000..8d51a381cd5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Function;
+
+/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+public class SimpleElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {
+
+    private static final long serialVersionUID = 1L;
+    private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+    public SimpleElasticsearchEmitter(
+            String index, String documentType, String idFieldName, boolean isDynamicIndex) {
+        // If this issue resolve https://issues.apache.org/jira/browse/MSHADE-260
+        // we can replace requestGenerator with lambda.
+        // Other corresponding issues https://issues.apache.org/jira/browse/FLINK-18857 and
+        // https://issues.apache.org/jira/browse/FLINK-18006
+        if (isDynamicIndex) {
+            this.requestGenerator =
+                    new DynamicIndexRequestGenerator(index, documentType, idFieldName);
+        } else {
+            this.requestGenerator =
+                    new StaticIndexRequestGenerator(index, documentType, idFieldName);
+        }
+    }
+
+    public void emit(
+            Map<String, Object> element, SinkWriter.Context context, RequestIndexer indexer) {
+        indexer.add(requestGenerator.apply(element));
+    }
+
+    private static class StaticIndexRequestGenerator
+            implements Function<Map<String, Object>, UpdateRequest>, Serializable {
+        private String index;
+        private String documentType;
+        private String idFieldName;
+
+        public StaticIndexRequestGenerator(String index, String documentType, String idFieldName) {
+            this.index = index;
+            this.documentType = documentType;
+            this.idFieldName = idFieldName;
+        }
+
+        public UpdateRequest apply(Map<String, Object> doc) {
+            return new UpdateRequest(index, documentType, doc.get(idFieldName).toString())
+                    .doc(doc)
+                    .upsert(doc);
+        }
+    }
+
+    private static class DynamicIndexRequestGenerator
+            implements Function<Map<String, Object>, UpdateRequest>, Serializable {
+        private String index;
+        private String documentType;
+        private String idFieldName;
+
+        public DynamicIndexRequestGenerator(String index, String documentType, String idFieldName) {
+            this.index = index;
+            this.documentType = documentType;
+            this.idFieldName = idFieldName;
+        }
+
+        public UpdateRequest apply(Map<String, Object> doc) {
+            return new UpdateRequest(
+                            doc.get(index).toString(),
+                            documentType,
+                            doc.get(idFieldName).toString())
+                    .doc(doc)
+                    .upsert(doc);
+        }
+    }
+}
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 053ac8108a1..8e5ea1443f4 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -151,6 +151,8 @@ Classes for state operations:
 
 Classes to define source & sink:
 
+    - :class:`connectors.elasticsearch.ElasticsearchSink`:
+      A sink for publishing data into Elasticsearch 6 or Elasticsearch 7.
     - :class:`connectors.FlinkKafkaConsumer`:
       A streaming data source that pulls a parallel data stream from Apache Kafka.
     - :class:`connectors.FlinkKafkaProducer`:
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py
index c0da99d2589..c67493398be 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -16,6 +16,8 @@
 # limitations under the License.
 ################################################################################
 from pyflink.datastream.connectors.base import Sink, Source, DeliveryGuarantee
+from pyflink.datastream.connectors.elasticsearch import (Elasticsearch6SinkBuilder,
+                                                         Elasticsearch7SinkBuilder)
 from pyflink.datastream.connectors.file_system import (FileEnumeratorProvider, FileSink, FileSource,
                                                        BucketAssigner, FileSourceBuilder,
                                                        FileSplitAssignerProvider, OutputFileConfig,
@@ -69,5 +71,7 @@ __all__ = [
     'StreamingFileSink',
     'FlinkKinesisConsumer',
     'KinesisStreamsSink',
-    'KinesisFirehoseSink'
+    'KinesisFirehoseSink',
+    'Elasticsearch6SinkBuilder',
+    'Elasticsearch7SinkBuilder'
 ]
diff --git a/flink-python/pyflink/datastream/connectors/elasticsearch.py b/flink-python/pyflink/datastream/connectors/elasticsearch.py
new file mode 100644
index 00000000000..5d9c58c1dc0
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/elasticsearch.py
@@ -0,0 +1,298 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+from enum import Enum
+from typing import List, Union
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray
+
+
+__all__ = ['FlushBackoffType',
+           'ElasticsearchEmitter',
+           'Elasticsearch6SinkBuilder',
+           'Elasticsearch7SinkBuilder',
+           'ElasticsearchSink']
+
+
+class FlushBackoffType(Enum):
+    """
+    Used to control whether the sink should retry failed requests at all or with which kind back off
+    strategy.
+
+    :data: `CONSTANT`:
+
+    After every failure, it waits a configured time until the retries are exhausted.
+
+    :data: `EXPONENTIAL`:
+
+    After every failure, it waits initially the configured time and increases the waiting time
+    exponentially until the retries are exhausted.
+
+    :data: `NONE`:
+
+    The failure is not retried.
+    """
+
+    CONSTANT = 0,
+    EXPONENTIAL = 1,
+    NONE = 2,
+
+    def _to_j_flush_backoff_type(self):
+        JFlushBackoffType = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+        return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchEmitter(object):
+    """
+    Emitter which is used by sinks to prepare elements for sending them to Elasticsearch.
+    """
+
+    def __init__(self, j_emitter):
+        self._j_emitter = j_emitter
+
+    @staticmethod
+    def static_index(index: str, key_field: str = None, doc_type: str = None) \
+            -> 'ElasticsearchEmitter':
+        """
+        Creates an emitter with static index which is invoked on every record to convert it to
+        Elasticsearch actions.
+        """
+        JSimpleElasticsearchEmitter = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
+        j_emitter = JSimpleElasticsearchEmitter(index, doc_type, key_field, False)
+        return ElasticsearchEmitter(j_emitter)
+
+    @staticmethod
+    def dynamic_index(index_field: str, key_field: str = None, doc_type: str = None) \
+            -> 'ElasticsearchEmitter':
+        """
+        Creates an emitter with dynamic index which is invoked on every record to convert it to
+        Elasticsearch actions.
+        """
+        JSimpleElasticsearchEmitter = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
+        j_emitter = JSimpleElasticsearchEmitter(index_field, doc_type, key_field, True)
+        return ElasticsearchEmitter(j_emitter)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+    """
+    Base builder to construct a ElasticsearchSink.
+    """
+
+    @abc.abstractmethod
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = None
+
+    @abc.abstractmethod
+    def get_http_host_class(self):
+        """
+        Gets the org.apache.http.HttpHost class which path is different in different Elasticsearch
+        version.
+        """
+        pass
+
+    def set_emitter(self, emitter: ElasticsearchEmitter) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the emitter which is invoked on every record to convert it to Elasticsearch actions.
+
+        :param emitter: The emitter to process records into Elasticsearch actions.
+        """
+        self._j_elasticsearch_sink_builder.setEmitter(emitter._j_emitter)
+        return self
+
+    def set_hosts(self, hosts: Union[str, List[str]]) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the hosts where the Elasticsearch cluster nodes are reachable.
+        """
+        if not isinstance(hosts, list):
+            hosts = [hosts]
+        JHttpHost = self.get_http_host_class()
+        j_http_hosts_list = [JHttpHost.create(x) for x in hosts]
+        j_http_hosts_array = to_jarray(JHttpHost, j_http_hosts_list)
+        self._j_elasticsearch_sink_builder.setHosts(j_http_hosts_array)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) \
+            -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the wanted DeliveryGuarantee. The default delivery guarantee is DeliveryGuarantee#NONE
+        """
+        j_delivery_guarantee = delivery_guarantee._to_j_delivery_guarantee()
+        self._j_elasticsearch_sink_builder.setDeliveryGuarantee(j_delivery_guarantee)
+        return self
+
+    def set_bulk_flush_max_actions(self, num_max_actions: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+        disable it. The default flush size 1000.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushMaxActions(num_max_actions)
+        return self
+
+    def set_bulk_flush_max_size_mb(self, max_size_mb: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+        disable it.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushMaxSizeMb(max_size_mb)
+        return self
+
+    def set_bulk_flush_interval(self, interval_millis: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushInterval(interval_millis)
+        return self
+
+    def set_bulk_flush_backoff_strategy(self,
+                                        flush_backoff_type: FlushBackoffType,
+                                        max_retries: int,
+                                        delay_millis: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the type of back off to use when flushing bulk requests. The default bulk flush back
+        off type is FlushBackoffType#NONE.
+
+        Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+        milliseconds.
+
+        Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushBackoffStrategy(
+            flush_backoff_type._to_j_flush_backoff_type(), max_retries, delay_millis)
+        return self
+
+    def set_connection_username(self, username: str) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the username used to authenticate the connection with the Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionUsername(username)
+        return self
+
+    def set_connection_password(self, password: str) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the password used to authenticate the connection with the Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionPassword(password)
+        return self
+
+    def set_connection_path_prefix(self, prefix: str) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets a prefix which used for every REST communication to the Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionPathPrefix(prefix)
+        return self
+
+    def set_connection_request_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the timeout for requesting the connection of the Elasticsearch cluster from the
+        connection manager.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionRequestTimeout(timeout)
+        return self
+
+    def set_connection_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the timeout for establishing a connection of the Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionTimeout(timeout)
+        return self
+
+    def set_socket_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the timeout for waiting for data or, put differently, a maximum period inactivity
+        between two consecutive data packets.
+        """
+        self._j_elasticsearch_sink_builder.setSocketTimeout(timeout)
+        return self
+
+    def build(self) -> 'ElasticsearchSink':
+        """
+        Constructs the ElasticsearchSink with the properties configured this builder.
+        """
+        return ElasticsearchSink(self._j_elasticsearch_sink_builder.build())
+
+
+class Elasticsearch6SinkBuilder(ElasticsearchSinkBuilderBase):
+    """
+    Builder to construct an Elasticsearch 6 compatible ElasticsearchSink.
+
+    The following example shows the minimal setup to create a ElasticsearchSink that submits
+    actions on checkpoint or the default number of actions was buffered (1000).
+
+    Example:
+    ::
+
+        >>> sink = Elasticsearch6SinkBuilder() \\
+        ...     .set_hosts('localhost:9200') \\
+        ...     .set_emitter(ElasticsearchEmitter.static_index("user", "key_col")) \\
+        ...     .build()
+    """
+
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder()
+
+    def get_http_host_class(self):
+        return get_gateway().jvm.org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost
+
+
+class Elasticsearch7SinkBuilder(ElasticsearchSinkBuilderBase):
+    """
+    Builder to construct an Elasticsearch 7 compatible ElasticsearchSink.
+
+    The following example shows the minimal setup to create a ElasticsearchSink that submits
+    actions on checkpoint or the default number of actions was buffered (1000).
+
+    Example:
+    ::
+
+        >>> sink = Elasticsearch7SinkBuilder() \\
+        ...     .set_hosts('localhost:9200') \\
+        ...     .set_emitter(ElasticsearchEmitter.dynamic_index("index_col", "key_col")) \\
+        ...     .build()
+    """
+
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder()
+
+    def get_http_host_class(self):
+        return get_gateway().jvm.org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost
+
+
+class ElasticsearchSink(Sink):
+    """
+    Flink Sink to insert or update data in an Elasticsearch index. The sink supports the following
+    delivery guarantees.
+
+    DeliveryGuarantee.NONE does not provide any guarantees: actions are flushed to Elasticsearch
+    only depending on the configurations of the bulk processor. In case of a failure, it might
+    happen that actions are lost if the bulk processor still has buffered actions.
+
+    DeliveryGuarantee.AT_LEAST_ONCE on a checkpoint the sink will wait until all buffered actions
+    are flushed to and acknowledged by Elasticsearch. No actions will be lost but actions might be
+    sent to Elasticsearch multiple times when Flink restarts. These additional requests may cause
+    inconsistent data in ElasticSearch right after the restart, but eventually everything will be
+    consistent again.
+    """
+    def __init__(self, j_elasticsearch_sink):
+        super(ElasticsearchSink, self).__init__(sink=j_elasticsearch_sink)
diff --git a/flink-python/pyflink/datastream/tests/test_connectors.py b/flink-python/pyflink/datastream/tests/test_connectors.py
index 6ff0d64019c..e0a1d2aa671 100644
--- a/flink-python/pyflink/datastream/tests/test_connectors.py
+++ b/flink-python/pyflink/datastream/tests/test_connectors.py
@@ -17,6 +17,9 @@
 ################################################################################
 from abc import ABC, abstractmethod
 
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, \
+    FlushBackoffType, ElasticsearchEmitter
+
 from pyflink.common import typeinfo, Duration, WatermarkStrategy, ConfigOptions
 from pyflink.common.serialization import JsonRowDeserializationSchema, \
     JsonRowSerializationSchema, Encoder, SimpleStringSchema
@@ -62,6 +65,84 @@ class ConnectorTestBase(PyFlinkTestCase, ABC):
             get_gateway().jvm.Thread.currentThread().setContextClassLoader(self._cxt_clz_loader)
 
 
+class FlinkElasticsearch7Test(ConnectorTestBase):
+
+    @classmethod
+    def _get_jars_relative_path(cls):
+        return '/flink-connectors/flink-sql-connector-elasticsearch7'
+
+    def test_es_sink(self):
+        ds = self.env.from_collection(
+            [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+            type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+        es_sink = Elasticsearch7SinkBuilder() \
+            .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+            .set_hosts(['localhost:9200']) \
+            .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+            .set_bulk_flush_max_actions(1) \
+            .set_bulk_flush_max_size_mb(2) \
+            .set_bulk_flush_interval(1000) \
+            .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+            .set_connection_username('foo') \
+            .set_connection_password('bar') \
+            .set_connection_path_prefix('foo-bar') \
+            .set_connection_request_timeout(30000) \
+            .set_connection_timeout(31000) \
+            .set_socket_timeout(32000) \
+            .build()
+
+        j_emitter = get_field_value(es_sink.get_java_function(), 'emitter')
+        self.assertTrue(
+            is_instance_of(
+                j_emitter,
+                'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
+        self.assertEqual(
+            get_field_value(
+                es_sink.get_java_function(), 'hosts')[0].toString(), 'http://localhost:9200')
+        self.assertEqual(
+            get_field_value(
+                es_sink.get_java_function(), 'deliveryGuarantee').toString(), 'at-least-once')
+
+        j_build_bulk_processor_config = get_field_value(
+            es_sink.get_java_function(), 'buildBulkProcessorConfig')
+        self.assertEqual(j_build_bulk_processor_config.getBulkFlushMaxActions(), 1)
+        self.assertEqual(j_build_bulk_processor_config.getBulkFlushMaxMb(), 2)
+        self.assertEqual(j_build_bulk_processor_config.getBulkFlushInterval(), 1000)
+        self.assertEqual(j_build_bulk_processor_config.getFlushBackoffType().toString(), 'CONSTANT')
+        self.assertEqual(j_build_bulk_processor_config.getBulkFlushBackoffRetries(), 3)
+        self.assertEqual(j_build_bulk_processor_config.getBulkFlushBackOffDelay(), 3000)
+
+        j_network_client_config = get_field_value(
+            es_sink.get_java_function(), 'networkClientConfig')
+        self.assertEqual(j_network_client_config.getUsername(), 'foo')
+        self.assertEqual(j_network_client_config.getPassword(), 'bar')
+        self.assertEqual(j_network_client_config.getConnectionRequestTimeout(), 30000)
+        self.assertEqual(j_network_client_config.getConnectionTimeout(), 31000)
+        self.assertEqual(j_network_client_config.getSocketTimeout(), 32000)
+        self.assertEqual(j_network_client_config.getConnectionPathPrefix(), 'foo-bar')
+
+        ds.sink_to(es_sink).name('es sink')
+
+    def test_es_sink_dynamic(self):
+        ds = self.env.from_collection(
+            [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+            type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+        es_dynamic_index_sink = Elasticsearch7SinkBuilder() \
+            .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+            .set_hosts(['localhost:9200']) \
+            .build()
+
+        j_emitter = get_field_value(es_dynamic_index_sink.get_java_function(), 'emitter')
+        self.assertTrue(
+            is_instance_of(
+                j_emitter,
+                'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
+
+        ds.sink_to(es_dynamic_index_sink).name('es dynamic index sink')
+
+
 class FlinkKafkaTest(ConnectorTestBase):
 
     @classmethod
diff --git a/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py b/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py
new file mode 100644
index 00000000000..0836b1d18ab
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/connectors/elasticsearch.py
@@ -0,0 +1,140 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import sys
+
+from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \
+    Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitter
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import DeliveryGuarantee
+
+
+def write_to_es6(env):
+    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+        'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
+    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+    ds = env.from_collection(
+        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+        type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+    es_sink = Elasticsearch6SinkBuilder() \
+        .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
+        .set_hosts(['localhost:9200']) \
+        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+        .set_bulk_flush_max_actions(1) \
+        .set_bulk_flush_max_size_mb(2) \
+        .set_bulk_flush_interval(1000) \
+        .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+        .set_connection_username('foo') \
+        .set_connection_password('bar') \
+        .set_connection_path_prefix('foo-bar') \
+        .set_connection_request_timeout(30000) \
+        .set_connection_timeout(31000) \
+        .set_socket_timeout(32000) \
+        .build()
+
+    ds.sink_to(es_sink).name('es6 sink')
+
+    env.execute()
+
+
+def write_to_es6_dynamic_index(env):
+    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+        'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
+    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+    ds = env.from_collection(
+        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+        type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+    es_sink = Elasticsearch6SinkBuilder() \
+        .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
+        .set_hosts(['localhost:9200']) \
+        .build()
+
+    ds.sink_to(es_sink).name('es6 dynamic index sink')
+
+    env.execute()
+
+
+def write_to_es7(env):
+    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+        'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
+    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+    ds = env.from_collection(
+        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+        type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+    es7_sink = Elasticsearch7SinkBuilder() \
+        .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
+        .set_hosts(['localhost:9200']) \
+        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+        .set_bulk_flush_max_actions(1) \
+        .set_bulk_flush_max_size_mb(2) \
+        .set_bulk_flush_interval(1000) \
+        .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
+        .set_connection_username('foo') \
+        .set_connection_password('bar') \
+        .set_connection_path_prefix('foo-bar') \
+        .set_connection_request_timeout(30000) \
+        .set_connection_timeout(31000) \
+        .set_socket_timeout(32000) \
+        .build()
+
+    ds.sink_to(es7_sink).name('es7 sink')
+
+    env.execute()
+
+
+def write_to_es7_dynamic_index(env):
+    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
+        'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
+    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
+
+    ds = env.from_collection(
+        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
+        type_info=Types.MAP(Types.STRING(), Types.STRING()))
+
+    es7_sink = Elasticsearch7SinkBuilder() \
+        .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
+        .set_hosts(['localhost:9200']) \
+        .build()
+
+    ds.sink_to(es7_sink).name('es7 dynamic index sink')
+
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+
+    print("start writing data to elasticsearch6")
+    write_to_es6(env)
+    write_to_es6_dynamic_index(env)
+
+    print("start writing data to elasticsearch7")
+    write_to_es7(env)
+    write_to_es7_dynamic_index(env)