You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Francis Conroy <fr...@switchdin.com> on 2022/02/25 06:35:29 UTC

pyflink object to java object

Hi all,

we're using pyflink for most of our flink work and are sometimes into a
java process function.
Our new java process function takes an argument in in the constructor which
is a Row containing default values. I've declared my Row in pyflink like
this:

default_row = Row(ep_uuid="",
                  unit_uuid=None,
                  unit_longitude=None,
                  unit_latitude=None,
                  unit_state=None,
                  unit_country=None,
                  pf_uuid=None,
                  pf_name=None)

row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
           Types.STRING(),  # unit_uuid
           Types.DOUBLE(),  # unit_longitude
           Types.DOUBLE(),  # unit_latitude
           Types.STRING(),  # unit_state
           Types.STRING(),  # unit_country
           Types.STRING(),  # pf_uuid
           Types.STRING()  # pf_name
           ])

I'm now trying to get a handle to a java row object in the jvm so I can
pass that into the process function's constructor.

endpoint_info_enriched_stream =
DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
    jvm.org.switchdin.operators.TableEnrich(j_obj)))

I've tried a few approaches, but I really can't figure out how to do this,
I'm not sure what I need on each side for this, a coder, serializer,
pickler?

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: pyflink object to java object

Posted by Francis Conroy <fr...@switchdin.com>.
Hi Xingbo,

I think that might work for me, I'll give it a try

On Tue, 1 Mar 2022 at 15:06, Xingbo Huang <hx...@gmail.com> wrote:

> Hi,
> With py4j, you can call any Java method. On how to create a Java Row, you
> can call the `createRowWithNamedPositions` method of `RowUtils`[1].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowUtils.java#
>
> Best,
> Xingbo
>
> Francis Conroy <fr...@switchdin.com> 于2022年2月25日周五 14:35写道:
>
>> Hi all,
>>
>> we're using pyflink for most of our flink work and are sometimes into a
>> java process function.
>> Our new java process function takes an argument in in the constructor
>> which is a Row containing default values. I've declared my Row in pyflink
>> like this:
>>
>> default_row = Row(ep_uuid="",
>>                   unit_uuid=None,
>>                   unit_longitude=None,
>>                   unit_latitude=None,
>>                   unit_state=None,
>>                   unit_country=None,
>>                   pf_uuid=None,
>>                   pf_name=None)
>>
>> row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
>>            Types.STRING(),  # unit_uuid
>>            Types.DOUBLE(),  # unit_longitude
>>            Types.DOUBLE(),  # unit_latitude
>>            Types.STRING(),  # unit_state
>>            Types.STRING(),  # unit_country
>>            Types.STRING(),  # pf_uuid
>>            Types.STRING()  # pf_name
>>            ])
>>
>> I'm now trying to get a handle to a java row object in the jvm so I can
>> pass that into the process function's constructor.
>>
>> endpoint_info_enriched_stream = DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
>>     jvm.org.switchdin.operators.TableEnrich(j_obj)))
>>
>> I've tried a few approaches, but I really can't figure out how to do
>> this, I'm not sure what I need on each side for this, a coder, serializer,
>> pickler?
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: pyflink object to java object

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
With py4j, you can call any Java method. On how to create a Java Row, you
can call the `createRowWithNamedPositions` method of `RowUtils`[1].

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowUtils.java#

Best,
Xingbo

Francis Conroy <fr...@switchdin.com> 于2022年2月25日周五 14:35写道:

> Hi all,
>
> we're using pyflink for most of our flink work and are sometimes into a
> java process function.
> Our new java process function takes an argument in in the constructor
> which is a Row containing default values. I've declared my Row in pyflink
> like this:
>
> default_row = Row(ep_uuid="",
>                   unit_uuid=None,
>                   unit_longitude=None,
>                   unit_latitude=None,
>                   unit_state=None,
>                   unit_country=None,
>                   pf_uuid=None,
>                   pf_name=None)
>
> row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
>            Types.STRING(),  # unit_uuid
>            Types.DOUBLE(),  # unit_longitude
>            Types.DOUBLE(),  # unit_latitude
>            Types.STRING(),  # unit_state
>            Types.STRING(),  # unit_country
>            Types.STRING(),  # pf_uuid
>            Types.STRING()  # pf_name
>            ])
>
> I'm now trying to get a handle to a java row object in the jvm so I can
> pass that into the process function's constructor.
>
> endpoint_info_enriched_stream = DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
>     jvm.org.switchdin.operators.TableEnrich(j_obj)))
>
> I've tried a few approaches, but I really can't figure out how to do this,
> I'm not sure what I need on each side for this, a coder, serializer,
> pickler?
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>