You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ishwara Varnasi <iv...@gmail.com> on 2018/05/10 14:43:39 UTC

How to use keyBy on ConnectedStream?

Hello,
I am using ConnectedStream to process two different types of messages using
CoFlatMap. However, I would like to use keyBy on the ConnectedStream such
that messages with same value of certain property should always be sent to
same instance of CoFlatMap instance. So I've tried keyBy on
ConnectedStream, surprised to see that the return type is not grouped.

ConnectedStreams<MessageType1, MessageTyp2> connect =
myDataStream1.connect(myDataStreamOther);
connect = connect.keyBy("property1", "property2");

// property1 is a valid property in MessageTyp1 and property2 is a
valid property of MessageType2

However, I get following exception:

Caused by: org.apache.flink.api.common.InvalidProgramException: This
type (GenericType<com....MessageType1>) cannot be used as key.

How to use keyBy with ConnectedStream and ensure that grouped messages are
handled by same instance of CoFlatMap?

thanks
Ishwara Varnasi

Re: How to use keyBy on ConnectedStream?

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Ishwara,

the `keyBy()` method automatically ensures that records with the same key will be processed by the same instance of a CoFlatMap.

As for the exception, I suppose the types `MessageType1` and `MessageType1` are POJOs which should follow some rules [1]. 
Also, make sure that (1) `property1` and `property2` are not arrays; (2) their types have overridden the `hashCode()` method [2].

Hope that helps,
Xingcan

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types <https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations>

> On May 10, 2018, at 10:43 PM, Ishwara Varnasi <iv...@gmail.com> wrote:
> 
> Hello,
> I am using ConnectedStream to process two different types of messages using CoFlatMap. However, I would like to use keyBy on the ConnectedStream such that messages with same value of certain property should always be sent to same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, surprised to see that the return type is not grouped.
> 
> ConnectedStreams<MessageType1, MessageTyp2> connect = myDataStream1.connect(myDataStreamOther);
> connect = connect.keyBy("property1", "property2");
> // property1 is a valid property in MessageTyp1 and property2 is a valid property of MessageType2
> However, I get following exception:
> Caused by: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com....MessageType1>) cannot be used as key.
> How to use keyBy with ConnectedStream and ensure that grouped messages are handled by same instance of CoFlatMap?
> 
> thanks
> Ishwara Varnasi