You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/03/15 14:39:03 UTC

Working with DataStreams of Java objects in Pyflink?

Hi all,

Looking to use Pyflink to work with some scala-defined objects being
emitted from a custom source. When trying to manipulate the objects in a
pyflink defined MapFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
I'm hitting an error like:

Caused by: java.lang.UnsupportedOperationException: The type information:
Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
Option[Long])] is not supported in PyFlink currently.

The scala object is defined something like:

```
object <...> {
  case class Record(
    id: Long,
    created_at: Option[Long],
    updated_at: Option[Long],
    ...
  )
}
```

The pyflink code is something like:

```
class Mutate(MapFunction):
  def map(self,value):
    print(value.id)
    value.id = 123

...

records = env.add_source(..)
records = records.map(Mutate()
```

Can you provide any advice on how to work with these kinds of objects in
Pyflink?

Thanks in advance!

Re: Working with DataStreams of Java objects in Pyflink?

Posted by Kevin Lam <ke...@shopify.com>.
Hi Shuiqiang Chen,

Thanks for the quick response. Oh I see, that's too bad POJO is not
currently supported.

I'd like to check if I understand your suggestion about RowType. You're
suggesting something like:

1/ Define subclasses of RowType in Java/Scala to hold our java objects we
want to manipulate in Python.
2/ When datastreams/sources emit objects of this type in pyflink, we can
mutate and read from these java defined RowTypes as needed, because Python
doesn't know how to handle arbitrary POJOs, but knows how to handle RowType
objects.

Is that correct? A simple example of extending/using RowType would be
helpful if you have a chance.

Thanks again for all your help, here and in the other threads on this
mailing list, really appreciate it!!

On Mon, Mar 15, 2021 at 11:59 AM Shuiqiang Chen <ac...@gmail.com> wrote:

> Hi Kevin,
>
> Currently, POJO type is not supported in Python DataStream API because it
> is hard to deal with the conversion between Python Objects and Java
> Objects. Maybe you can use a RowType to represent the POJO class such as
> Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
> Types.LONG()]). We will try to support the POJO type in the future.
>
> Best,
> Shuiqiang
>
> Kevin Lam <ke...@shopify.com> 于2021年3月15日周一 下午10:46写道:
>
>> Hi all,
>>
>> Looking to use Pyflink to work with some scala-defined objects being
>> emitted from a custom source. When trying to manipulate the objects in a
>> pyflink defined MapFunction
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
>> I'm hitting an error like:
>>
>> Caused by: java.lang.UnsupportedOperationException: The type information:
>> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
>> Option[Long])] is not supported in PyFlink currently.
>>
>> The scala object is defined something like:
>>
>> ```
>> object <...> {
>>   case class Record(
>>     id: Long,
>>     created_at: Option[Long],
>>     updated_at: Option[Long],
>>     ...
>>   )
>> }
>> ```
>>
>> The pyflink code is something like:
>>
>> ```
>> class Mutate(MapFunction):
>>   def map(self,value):
>>     print(value.id)
>>     value.id = 123
>>
>> ...
>>
>> records = env.add_source(..)
>> records = records.map(Mutate()
>> ```
>>
>> Can you provide any advice on how to work with these kinds of objects in
>> Pyflink?
>>
>> Thanks in advance!
>>
>

Re: Working with DataStreams of Java objects in Pyflink?

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Kevin,

Currently, POJO type is not supported in Python DataStream API because it
is hard to deal with the conversion between Python Objects and Java
Objects. Maybe you can use a RowType to represent the POJO class such as
Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
Types.LONG()]). We will try to support the POJO type in the future.

Best,
Shuiqiang

Kevin Lam <ke...@shopify.com> 于2021年3月15日周一 下午10:46写道:

> Hi all,
>
> Looking to use Pyflink to work with some scala-defined objects being
> emitted from a custom source. When trying to manipulate the objects in a
> pyflink defined MapFunction
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
> I'm hitting an error like:
>
> Caused by: java.lang.UnsupportedOperationException: The type information:
> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
> Option[Long])] is not supported in PyFlink currently.
>
> The scala object is defined something like:
>
> ```
> object <...> {
>   case class Record(
>     id: Long,
>     created_at: Option[Long],
>     updated_at: Option[Long],
>     ...
>   )
> }
> ```
>
> The pyflink code is something like:
>
> ```
> class Mutate(MapFunction):
>   def map(self,value):
>     print(value.id)
>     value.id = 123
>
> ...
>
> records = env.add_source(..)
> records = records.map(Mutate()
> ```
>
> Can you provide any advice on how to work with these kinds of objects in
> Pyflink?
>
> Thanks in advance!
>