You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ja...@ubs.com on 2020/02/17 10:31:17 UTC

Flink's Either type information

Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));

Thanks,

Jacopo Gobbi


Re: Flink's Either type information

Posted by Arvid Heise <ar...@ververica.com>.
Hi Jacopo,

to prevent type erasure in Java, you need to create a sub-type that
contains only reified types.

Instead of using a generic type with bound variables in

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
MyRightType>());

you can use

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
MyRightType>() {
     });

This will create an anonymous sub-type of MyKeyedBroadcastProcessFunction
that has the two types reified.

Another solution is to already create the sub type in your factory method.

     <MyLeftType, MyRightType> KeyedBroadcastProcessFunction<Integer,
Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>>
createFunction(...) {
          return KeyedBroadcastProcessFunction<Integer, Tuple2<Integer,
String>, String, Either<MyLeftType, MyRightType>> {
              ...
          };
     }


On Wed, Mar 4, 2020 at 4:08 PM <ja...@ubs.com> wrote:

> Hi all,
>
>
>
> Yes my problem is that I do not create the function inline but create a
> function directly when creating the data stream job.
>
> My code (which I cannot share) is exactly like your example, Yun, are you
> aware if there is a way to prevent code erasure?
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
> *From:* Yun Gao [mailto:yungao.gy@aliyun.com]
> *Sent:* Freitag, 21. Februar 2020 16:00
> *To:* Robert Metzger; Gobbi, Jacopo-XT
> *Cc:* user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>       Hi Jacopo, Robert,
>
>
>
>          Very sorry for missing the previous email and not response in
> time. I think exactly as Robert has pointed out with the example: using
> inline anonymous subclass of *KeyedBroadcastProcessFunction* should not
> cause the problem. As far as I know, the possible reason that cause the
> attached exception might be that the parameter types of *Either get* erased
> due to the way to create *KeyedBroadcastProcessFunction* object. For
> example, if you first implement a generic subclass of
> *KeyedBroadcastProcessFunction* like*:*
>
>
>
>       *public class MyKeyedBroadcastProcessFunction<MyLeftType,
> MyRightType> extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer,
> String>, String, Either<MyLeftType, MyRightType>> { ... }*
>
>
>
>      and create a function object directly when constructing the
> DataStream job:
>
>
>
>      *stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
> MyRightType>());*
>
>
>
>      Then *MyLeftType* and *MyRightType *will be erased and will cause
> the attached exception when Flink tries to inference the output type.
>
>
>
>      And I totally agree with Robert that attaching the corresponding
> codes would help debugging the problem.
>
>
>
>   Yours,
>
>     Yun
>
>
>
>
>
> ------------------------------------------------------------------
>
> From:Robert Metzger <rm...@apache.org>
>
> Send Time:2020 Feb. 21 (Fri.) 19:47
>
> To:jacopo.gobbi <ja...@ubs.com>
>
> Cc:yungao.gy <yu...@aliyun.com>; user <us...@flink.apache.org>
>
> Subject:Re: Flink's Either type information
>
>
>
> Hey Jacopo,
>
> can you post an example to reproduce the issue? I've tried it, but it
> worked in this artificial example:
>
>
>
> MapStateDescriptor<String, String> state = *new *MapStateDescriptor<>(*"test"*, BasicTypeInfo.*STRING_TYPE_INFO*, BasicTypeInfo.*STRING_TYPE_INFO*);
> DataStream<Either<Integer, String>> result = input
>       .map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.*of*(0, value)).returns(TupleTypeInfo.*getBasicTupleTypeInfo*(Integer.*class*, String.*class*))
>       .keyBy(0).connect(input.broadcast(state))
>       .process(*new *KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<Integer, String>>() {
>          @Override
>          *public void *processElement(Tuple2<Integer, String> value, ReadOnlyContext ctx, Collector<Either<Integer, String>> out) *throws *Exception {
>             out.collect(Either.*Left*(111));
>          }
>          @Override
>          *public void *processBroadcastElement(String value, Context ctx, Collector<Either<Integer, String>> out) *throws *Exception { }
>       });
> result.print();
>
>
>
> On Wed, Feb 19, 2020 at 6:07 PM <ja...@ubs.com> wrote:
>
> Yes, I create it the way you mentioned.
>
>
>
> *From:* Yun Gao [mailto:yungao.gy@aliyun.com]
> *Sent:* Dienstag, 18. Februar 2020 10:12
> *To:* Gobbi, Jacopo-XT; user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>       Hi Jacopo,
>
>
>
>           Could you also provide how the KeyedBroadcastProcessFunction is
> created when constructing datastream API ? For example, are you using
> something like
>
>
>
>           new KeyedBroadcastProcessFunction<Integer, Integer, Integer,
> Either<MyLeft, MyRight>() {
>
>                        // Function implementation
>
>              }
>
>
>
>              or something else?
>
>
>
>      Best,
>
>       Yun
>
>
>
>
>
> ------------------------------------------------------------------
>
> From:jacopo.gobbi <ja...@ubs.com>
>
> Send Time:2020 Feb. 17 (Mon.) 18:31
>
> To:user <us...@flink.apache.org>
>
> Subject:Flink's Either type information
>
>
>
> Hi all,
>
>
>
> How can an Either value be returned by a KeyedBroadcastProcessFunction?
>
> We keep getting "InvalidTypesException: Type extraction is not possible on
> Either type as it does not contain information about the 'left' type." when
> doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
>
>
>
> Thanks,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
>
>

RE: Flink's Either type information

Posted by ja...@ubs.com.
Hi all,

Yes my problem is that I do not create the function inline but create a function directly when creating the data stream job.
My code (which I cannot share) is exactly like your example, Yun, are you aware if there is a way to prevent code erasure?

Kind regards,

Jacopo Gobbi


From: Yun Gao [mailto:yungao.gy@aliyun.com]
Sent: Freitag, 21. Februar 2020 16:00
To: Robert Metzger; Gobbi, Jacopo-XT
Cc: user
Subject: [External] Re: Flink's Either type information

      Hi Jacopo, Robert,

         Very sorry for missing the previous email and not response in time. I think exactly as Robert has pointed out with the example: using inline anonymous subclass of KeyedBroadcastProcessFunction should not cause the problem. As far as I know, the possible reason that cause the attached exception might be that the parameter types of Either get erased due to the way to create KeyedBroadcastProcessFunction object. For example, if you first implement a generic subclass of KeyedBroadcastProcessFunction like:

      public class MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType> extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>> { ... }

     and create a function object directly when constructing the DataStream job:

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>());

     Then MyLeftType and MyRightType will be erased and will cause the attached exception when Flink tries to inference the output type.

     And I totally agree with Robert that attaching the corresponding codes would help debugging the problem.

  Yours,
    Yun


------------------------------------------------------------------
From:Robert Metzger <rm...@apache.org>
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi <ja...@ubs.com>
Cc:yungao.gy <yu...@aliyun.com>; user <us...@flink.apache.org>
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in this artificial example:


MapStateDescriptor<String, String> state = new MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
      .map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class))
      .keyBy(0).connect(input.broadcast(state))
      .process(new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<Integer, String>>() {
         @Override
         public void processElement(Tuple2<Integer, String> value, ReadOnlyContext ctx, Collector<Either<Integer, String>> out) throws Exception {
            out.collect(Either.Left(111));
         }
         @Override
         public void processBroadcastElement(String value, Context ctx, Collector<Either<Integer, String>> out) throws Exception { }
      });
result.print();

On Wed, Feb 19, 2020 at 6:07 PM <ja...@ubs.com>> wrote:
Yes, I create it the way you mentioned.

From: Yun Gao [mailto:yungao.gy@aliyun.com<ma...@aliyun.com>]
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information

      Hi Jacopo,

          Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like

          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, Either<MyLeft, MyRight>() {
                       // Function implementation
             }

             or something else?

     Best,
      Yun


------------------------------------------------------------------
From:jacopo.gobbi <ja...@ubs.com>>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <us...@flink.apache.org>>
Subject:Flink's Either type information

Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));

Thanks,

Jacopo Gobbi




Re: Flink's Either type information

Posted by Yun Gao <yu...@aliyun.com>.
      Hi Jacopo, Robert, 

         Very sorry for missing the previous email and not response in time. I think exactly as Robert has pointed out with the example: using inline anonymous subclass of KeyedBroadcastProcessFunction should not cause the problem. As far as I know, the possible reason that cause the attached exception might be that the parameter types of Either get erased due to the way to create KeyedBroadcastProcessFunction object. For example, if you first implement a generic subclass of KeyedBroadcastProcessFunction like:

public class MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType> extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>> { ... }

     and create a function object directly when constructing the DataStream job:

stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>());

     Then MyLeftType and MyRightType will be erased and will cause the attached exception when Flink tries to inference the output type. 

     And I totally agree with Robert that attaching the corresponding codes would help debugging the problem.

  Yours,
    Yun



------------------------------------------------------------------
From:Robert Metzger <rm...@apache.org>
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi <ja...@ubs.com>
Cc:yungao.gy <yu...@aliyun.com>; user <us...@flink.apache.org>
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in this artificial example:

MapStateDescriptor<String, String> state = new MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
      .map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class))
      .keyBy(0).connect(input.broadcast(state))
      .process(new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<Integer, String>>() {
         @Override
         public void processElement(Tuple2<Integer, String> value, ReadOnlyContext ctx, Collector<Either<Integer, String>> out) throws Exception {
            out.collect(Either.Left(111));
         }
         @Override
         public void processBroadcastElement(String value, Context ctx, Collector<Either<Integer, String>> out) throws Exception { }
      });
result.print();
On Wed, Feb 19, 2020 at 6:07 PM <ja...@ubs.com> wrote:

Yes, I create it the way you mentioned.
 
From: Yun Gao [mailto:yungao.gy@aliyun.com] 
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information
 
      Hi Jacopo,
 
          Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like 
 
          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, Either<MyLeft, MyRight>() { 
                       // Function implementation
             }
 
             or something else?
 
     Best, 
      Yun
 
 
------------------------------------------------------------------
From:jacopo.gobbi <ja...@ubs.com>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <us...@flink.apache.org>
Subject:Flink's Either type information
 
Hi all,
 
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
 
Thanks,
 
Jacopo Gobbi
 
 


Re: Flink's Either type information

Posted by Robert Metzger <rm...@apache.org>.
Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it
worked in this artificial example:

MapStateDescriptor<String, String> state = new
MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
      .map((MapFunction<String, Tuple2<Integer, String>>) value ->
Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
String.class))
      .keyBy(0).connect(input.broadcast(state))
      .process(new KeyedBroadcastProcessFunction<Integer,
Tuple2<Integer, String>, String, Either<Integer, String>>() {
         @Override
         public void processElement(Tuple2<Integer, String> value,
ReadOnlyContext ctx, Collector<Either<Integer, String>> out) throws
Exception {
            out.collect(Either.Left(111));
         }
         @Override
         public void processBroadcastElement(String value, Context
ctx, Collector<Either<Integer, String>> out) throws Exception { }
      });
result.print();


On Wed, Feb 19, 2020 at 6:07 PM <ja...@ubs.com> wrote:

> Yes, I create it the way you mentioned.
>
>
>
> *From:* Yun Gao [mailto:yungao.gy@aliyun.com]
> *Sent:* Dienstag, 18. Februar 2020 10:12
> *To:* Gobbi, Jacopo-XT; user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>       Hi Jacopo,
>
>
>
>           Could you also provide how the KeyedBroadcastProcessFunction is
> created when constructing datastream API ? For example, are you using
> something like
>
>
>
>           new KeyedBroadcastProcessFunction<Integer, Integer, Integer,
> Either<MyLeft, MyRight>() {
>
>                        // Function implementation
>
>              }
>
>
>
>              or something else?
>
>
>
>      Best,
>
>       Yun
>
>
>
>
>
> ------------------------------------------------------------------
>
> From:jacopo.gobbi <ja...@ubs.com>
>
> Send Time:2020 Feb. 17 (Mon.) 18:31
>
> To:user <us...@flink.apache.org>
>
> Subject:Flink's Either type information
>
>
>
> Hi all,
>
>
>
> How can an Either value be returned by a KeyedBroadcastProcessFunction?
>
> We keep getting "InvalidTypesException: Type extraction is not possible on
> Either type as it does not contain information about the 'left' type." when
> doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
>
>
>
> Thanks,
>
>
>
> Jacopo Gobbi
>
>
>
>
>

RE: Flink's Either type information

Posted by ja...@ubs.com.
Yes, I create it the way you mentioned.

From: Yun Gao [mailto:yungao.gy@aliyun.com]
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information

      Hi Jacopo,

          Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like

          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, Either<MyLeft, MyRight>() {
                       // Function implementation
             }

             or something else?

     Best,
      Yun


------------------------------------------------------------------
From:jacopo.gobbi <ja...@ubs.com>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <us...@flink.apache.org>
Subject:Flink's Either type information

Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));

Thanks,

Jacopo Gobbi



Re: Flink's Either type information

Posted by Yun Gao <yu...@aliyun.com>.
      Hi Jacopo,

          Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like 

          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, Either<MyLeft, MyRight>() { 
                       // Function implementation
             }

             or something else?

     Best, 
      Yun



------------------------------------------------------------------
From:jacopo.gobbi <ja...@ubs.com>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <us...@flink.apache.org>
Subject:Flink's Either type information

Hi all,
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));

Thanks,

Jacopo Gobbi