You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sandeep Sharat <sa...@gmail.com> on 2022/03/29 12:50:43 UTC

Pyflink elastic search connectors

Hello Everyone,

I have been working on a streaming application using elasticsearch as the
sink. I had achieved it using the java api quite easily. But due to a
recent policy change we are moving towards the python api for flink,
however we were unable to find any python elastic search connectors for
flink. We were able to find support for the kafka connectors in python.
Does it mean that we have to write our own connectors in python  to
make use of the flink-elasticsearch connector jar?....

Thanks in advance
-- 
Thanks & Regards
Sandeep Sharat Kumar

Re: Pyflink elastic search connectors

Posted by Sandeep Sharat <sa...@gmail.com>.
Thank you for your reply. Now I have a better understanding of it.

On Wed, 30 Mar, 2022, 5:29 pm LuNing Wang, <wa...@gmail.com> wrote:

> Hi,
>
> The principle of the python datastream connector is interprocess
> communication via py4j. I blocked in a class loading problem, so I haven't
> achieved the PR about the Python ES datastream connector yet. Compared with
> other connectors, the ES is a little more troublesome. Because implementing
> of interface ElasticsearchEmitter is difficult in python code.
>
> If you want to use Python DataStream API for processing ES data and
> haven't any custom wrapper code. You can use PyFlink Table API to connect
> ES and convert Table to DataStream. When you use Table API.
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/
>
> Beat,
> LuNing Wang
>
>
> Sandeep Sharat <sa...@gmail.com> 于2022年3月30日周三 18:38写道:
>
>> Hi,
>>
>> I am pretty much a novice in python. So writing an entire wrapper using
>> python may be a tough nut to crack for me. But just out of curiosity, want
>> to ask ask the question that why were the connectors not implemented in
>> python api. Is it because of a very lesser number of  use cases ???or most
>> use cases regarding the elasticsearch use the table api to achieve it??
>>
>> On Wed, 30 Mar, 2022, 9:58 am Sandeep Sharat, <sa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thank you for the quick responses. We are using the datastream api for
>>> pyflink. We are trying to implement a wrapper in python for the same as we
>>> speak. Hopefully it will work out. 😊
>>>
>>> On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang, <hx...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Are you using datastream api or table api?If you are using the table
>>>> api, you can use the connector by executing sql[1]. If you are using the
>>>> datastream api, there is really no es connector api provided, you need to
>>>> write python wrapper code, but the wrapper code is very simple. The
>>>> underlying code takes use of py4j to call the java api of es connector. For
>>>> details, you can refer to the wrapper code in kafka or pulsar[2].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Sandeep Sharat <sa...@gmail.com> 于2022年3月29日周二 20:51写道:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> I have been working on a streaming application using elasticsearch as
>>>>> the sink. I had achieved it using the java api quite easily. But due to a
>>>>> recent policy change we are moving towards the python api for flink,
>>>>> however we were unable to find any python elastic search connectors for
>>>>> flink. We were able to find support for the kafka connectors in python.
>>>>> Does it mean that we have to write our own connectors in python  to
>>>>> make use of the flink-elasticsearch connector jar?....
>>>>>
>>>>> Thanks in advance
>>>>> --
>>>>> Thanks & Regards
>>>>> Sandeep Sharat Kumar
>>>>>
>>>>

Re: Pyflink elastic search connectors

Posted by LuNing Wang <wa...@gmail.com>.
Hi,

The principle of the python datastream connector is interprocess
communication via py4j. I blocked in a class loading problem, so I haven't
achieved the PR about the Python ES datastream connector yet. Compared with
other connectors, the ES is a little more troublesome. Because implementing
of interface ElasticsearchEmitter is difficult in python code.

If you want to use Python DataStream API for processing ES data and haven't
any custom wrapper code. You can use PyFlink Table API to connect ES and
convert Table to DataStream. When you use Table API.
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/

Beat,
LuNing Wang


Sandeep Sharat <sa...@gmail.com> 于2022年3月30日周三 18:38写道:

> Hi,
>
> I am pretty much a novice in python. So writing an entire wrapper using
> python may be a tough nut to crack for me. But just out of curiosity, want
> to ask ask the question that why were the connectors not implemented in
> python api. Is it because of a very lesser number of  use cases ???or most
> use cases regarding the elasticsearch use the table api to achieve it??
>
> On Wed, 30 Mar, 2022, 9:58 am Sandeep Sharat, <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Thank you for the quick responses. We are using the datastream api for
>> pyflink. We are trying to implement a wrapper in python for the same as we
>> speak. Hopefully it will work out. 😊
>>
>> On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang, <hx...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Are you using datastream api or table api?If you are using the table
>>> api, you can use the connector by executing sql[1]. If you are using the
>>> datastream api, there is really no es connector api provided, you need to
>>> write python wrapper code, but the wrapper code is very simple. The
>>> underlying code takes use of py4j to call the java api of es connector. For
>>> details, you can refer to the wrapper code in kafka or pulsar[2].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>>>
>>> Best,
>>> Xingbo
>>>
>>> Sandeep Sharat <sa...@gmail.com> 于2022年3月29日周二 20:51写道:
>>>
>>>> Hello Everyone,
>>>>
>>>> I have been working on a streaming application using elasticsearch as
>>>> the sink. I had achieved it using the java api quite easily. But due to a
>>>> recent policy change we are moving towards the python api for flink,
>>>> however we were unable to find any python elastic search connectors for
>>>> flink. We were able to find support for the kafka connectors in python.
>>>> Does it mean that we have to write our own connectors in python  to
>>>> make use of the flink-elasticsearch connector jar?....
>>>>
>>>> Thanks in advance
>>>> --
>>>> Thanks & Regards
>>>> Sandeep Sharat Kumar
>>>>
>>>

Re: Pyflink elastic search connectors

Posted by Sandeep Sharat <sa...@gmail.com>.
Hi,

I am pretty much a novice in python. So writing an entire wrapper using
python may be a tough nut to crack for me. But just out of curiosity, want
to ask ask the question that why were the connectors not implemented in
python api. Is it because of a very lesser number of  use cases ???or most
use cases regarding the elasticsearch use the table api to achieve it??

On Wed, 30 Mar, 2022, 9:58 am Sandeep Sharat, <sa...@gmail.com>
wrote:

> Hi,
>
> Thank you for the quick responses. We are using the datastream api for
> pyflink. We are trying to implement a wrapper in python for the same as we
> speak. Hopefully it will work out. 😊
>
> On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang, <hx...@gmail.com> wrote:
>
>> Hi,
>>
>> Are you using datastream api or table api?If you are using the table api,
>> you can use the connector by executing sql[1]. If you are using the
>> datastream api, there is really no es connector api provided, you need to
>> write python wrapper code, but the wrapper code is very simple. The
>> underlying code takes use of py4j to call the java api of es connector. For
>> details, you can refer to the wrapper code in kafka or pulsar[2].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
>> [2]
>> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>>
>> Best,
>> Xingbo
>>
>> Sandeep Sharat <sa...@gmail.com> 于2022年3月29日周二 20:51写道:
>>
>>> Hello Everyone,
>>>
>>> I have been working on a streaming application using elasticsearch as
>>> the sink. I had achieved it using the java api quite easily. But due to a
>>> recent policy change we are moving towards the python api for flink,
>>> however we were unable to find any python elastic search connectors for
>>> flink. We were able to find support for the kafka connectors in python.
>>> Does it mean that we have to write our own connectors in python  to
>>> make use of the flink-elasticsearch connector jar?....
>>>
>>> Thanks in advance
>>> --
>>> Thanks & Regards
>>> Sandeep Sharat Kumar
>>>
>>

Re: Pyflink elastic search connectors

Posted by Sandeep Sharat <sa...@gmail.com>.
Hi,

Thank you for the quick responses. We are using the datastream api for
pyflink. We are trying to implement a wrapper in python for the same as we
speak. Hopefully it will work out. 😊

On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang, <hx...@gmail.com> wrote:

> Hi,
>
> Are you using datastream api or table api?If you are using the table api,
> you can use the connector by executing sql[1]. If you are using the
> datastream api, there is really no es connector api provided, you need to
> write python wrapper code, but the wrapper code is very simple. The
> underlying code takes use of py4j to call the java api of es connector. For
> details, you can refer to the wrapper code in kafka or pulsar[2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>
> Best,
> Xingbo
>
> Sandeep Sharat <sa...@gmail.com> 于2022年3月29日周二 20:51写道:
>
>> Hello Everyone,
>>
>> I have been working on a streaming application using elasticsearch as the
>> sink. I had achieved it using the java api quite easily. But due to a
>> recent policy change we are moving towards the python api for flink,
>> however we were unable to find any python elastic search connectors for
>> flink. We were able to find support for the kafka connectors in python.
>> Does it mean that we have to write our own connectors in python  to
>> make use of the flink-elasticsearch connector jar?....
>>
>> Thanks in advance
>> --
>> Thanks & Regards
>> Sandeep Sharat Kumar
>>
>

Re: Pyflink elastic search connectors

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

Are you using datastream api or table api?If you are using the table api,
you can use the connector by executing sql[1]. If you are using the
datastream api, there is really no es connector api provided, you need to
write python wrapper code, but the wrapper code is very simple. The
underlying code takes use of py4j to call the java api of es connector. For
details, you can refer to the wrapper code in kafka or pulsar[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py

Best,
Xingbo

Sandeep Sharat <sa...@gmail.com> 于2022年3月29日周二 20:51写道:

> Hello Everyone,
>
> I have been working on a streaming application using elasticsearch as the
> sink. I had achieved it using the java api quite easily. But due to a
> recent policy change we are moving towards the python api for flink,
> however we were unable to find any python elastic search connectors for
> flink. We were able to find support for the kafka connectors in python.
> Does it mean that we have to write our own connectors in python  to
> make use of the flink-elasticsearch connector jar?....
>
> Thanks in advance
> --
> Thanks & Regards
> Sandeep Sharat Kumar
>