You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 大森林 <ap...@foxmail.com> on 2020/10/02 14:39:34 UTC

what's wrong with my pojo when it's used by flink ?Thanks

I want to do an experiment with the operator "aggregate"


My code is:


Aggregate.java

https://paste.ubuntu.com/p/vvMKqZXt3r/


UserActionLogPOJO.java
https://paste.ubuntu.com/p/rfszzKbxDC/




The error I got is:



Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO&gt;Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init&gt;(ComparableAggregator.java:67)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
&nbsp;&nbsp; &nbsp;at Aggregate.main(Aggregate.java:52)

Process finished with exit code 1

Could you tell me where I am wrong with UserActionLogPOJO.java?

Thanks for your help

reply: what's wrong with my pojo when it's used by flink ?Thanks

Posted by 大森林 <ap...@foxmail.com>.
Thanks for your help~
I have solved this problem under your guidance


Close this issue please.


MUCH THANKS




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "Arvid Heise"                                                                                    <arvid@ververica.com&gt;;
发送时间:&nbsp;2020年10月3日(星期六) 凌晨1:35
收件人:&nbsp;"大森林"<appleyuchi@foxmail.com&gt;;
抄送:&nbsp;"user"<user@flink.apache.org&gt;;
主题:&nbsp;Re: what's wrong with my pojo when it's used by flink ?Thanks



Hi 大森林,


if you look in the full logs you'll see


3989 [main] INFO &nbsp;org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a getter for field itemId
3999 [main] INFO &nbsp;org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a setter for field itemId
3999 [main] INFO &nbsp;org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.test.checkpointing.UserActionLogPOJO cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types &amp; Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<org.apache.flink.test.checkpointing.UserActionLogPOJO&gt;Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
	at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
	at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init&gt;(ComparableAggregator.java:67)
	at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:809)
	at org.apache.flink.test.checkpointing.Aggregate.main(UnalignedCheckpointITCase.java:701)


The issue is that in your POJO your setter/getter do not match the field names. It's easiest to let your IDE generate them for you. For example, if you keep the current field names, you need to add



public String getItemId() {
   return itemId;
}

public void setItemId(String itemId) {
   this.itemId = itemId;
}

public void setPrice(int price) {
   this.price = price;
}

public void setUserId(String userId) {
   this.userId = userId;
}

public int getPrice() {
   return price;
}

public String getUserId() {
   return userId;
}

As you can see, none of your getters/setters is according to Java Beans specification and you need to add them all.



On Fri, Oct 2, 2020 at 4:39 PM 大森林 <appleyuchi@foxmail.com&gt; wrote:



I want to do an experiment with the operator "aggregate"


My code is:


Aggregate.java

https://paste.ubuntu.com/p/vvMKqZXt3r/


UserActionLogPOJO.java
https://paste.ubuntu.com/p/rfszzKbxDC/




The error I got is:



Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO&gt;Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init&gt;(ComparableAggregator.java:67)
&nbsp;&nbsp; &nbsp;at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
&nbsp;&nbsp; &nbsp;at Aggregate.main(Aggregate.java:52)

Process finished with exit code 1

Could you tell me where I am wrong with UserActionLogPOJO.java?

Thanks for your help







-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng&nbsp;&nbsp;&nbsp;

Re: what's wrong with my pojo when it's used by flink ?Thanks

Posted by Arvid Heise <ar...@ververica.com>.
Hi 大森林,

if you look in the full logs you'll see

3989 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
class org.apache.flink.test.checkpointing.UserActionLogPOJO does not
contain a getter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
class org.apache.flink.test.checkpointing.UserActionLogPOJO does not
contain a setter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
Class class org.apache.flink.test.checkpointing.UserActionLogPOJO cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance.
Exception in thread "main"
org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by field expression on
GenericType<org.apache.flink.test.checkpointing.UserActionLogPOJO>Field
expressions are only supported on POJO types, tuples, and case classes.
(See the Flink documentation on what is considered a POJO.)
at
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
at
org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:809)
at
org.apache.flink.test.checkpointing.Aggregate.main(UnalignedCheckpointITCase.java:701)

The issue is that in your POJO your setter/getter do not match the field
names. It's easiest to let your IDE generate them for you. For example, if
you keep the current field names, you need to add


public String getItemId() {
   return itemId;
}

public void setItemId(String itemId) {
   this.itemId = itemId;
}

public void setPrice(int price) {
   this.price = price;
}

public void setUserId(String userId) {
   this.userId = userId;
}

public int getPrice() {
   return price;
}

public String getUserId() {
   return userId;
}

As you can see, none of your getters/setters is according to Java Beans
specification and you need to add them all.

On Fri, Oct 2, 2020 at 4:39 PM 大森林 <ap...@foxmail.com> wrote:

>
> I want to do an experiment with the operator "aggregate"
>
> My code is:
>
> Aggregate.java
> https://paste.ubuntu.com/p/vvMKqZXt3r/
>
> UserActionLogPOJO.java
> https://paste.ubuntu.com/p/rfszzKbxDC/
>
>
> The error I got is:
>
> Exception in thread "main"
> org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
> Cannot reference field by field expression on
> GenericType<UserActionLogPOJO>Field expressions are only supported on POJO
> types, tuples, and case classes. (See the Flink documentation on what is
> considered a POJO.)
>     at
> org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
>     at
> org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
>     at
> org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
>     at Aggregate.main(Aggregate.java:52)
>
> Process finished with exit code 1
>
> *Could you tell me where I am wrong with UserActionLogPOJO.java?*
>
> *Thanks for your help*
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng