You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Utopia <ge...@gmail.com> on 2020/01/15 02:47:31 UTC

Question about Scala Case Class and List in Flink

Hi folks,

I have two questions about types in Flink when using Scala:

1. scala case class:

This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: Double)

In documentation, Scala case class is supported:
`Scala case classes (including Scala tuples): null fields not supported`

But the log info shows:
10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class io.github.streamingwithflink.util.SensorReading is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.


2. scala list

This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: Double, var list: List[String] = List[String]())
log shows:
No fields were detected for class scala.collection.immutable.List so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Does it means that scala list can be serialize that I can ignore this info if I don’t care the performance.
Should I use Java array list instead of scala list or create a custom serializer for SensorReading case class?

Thanks!


Best  regards
Utopia

Re: Question about Scala Case Class and List in Flink

Posted by Xavier <xa...@gmail.com>.
Thanks for ur suggestions!

On Fri, Feb 5, 2021 at 11:16 PM Timo Walther <tw...@apache.org> wrote:

> Dealing with types is not always easy in Flink. If you have further
> issues, it might make sense to just pass them explicitly. We list all
> types in:
>
> org.apache.flink.api.common.typeinfo.Types
>
> org.apache.flink.api.scala.typeutils.Types
>
> Regards,
> Timo
>
> On 05.02.21 16:04, Xavier wrote:
> > Hi Timo,
> >      Thank you for ur clarification, it is very useful to me, I am also
> > combining the realization of map function, trying to do implicit
> > conversion of case class, so that I can restore state from FS.
> >
> > On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Xavier,
> >
> >     the Scala API has special implicits in method such as
> >     `DataStream.map()`
> >     or `DataStream.keyBy()` to support Scala specifics like case classe.
> >     For
> >     Scala one needs to use the macro `createTypeInformation[CaseClass]`
> for
> >     Java we use reflection via `TypeInformation.of()`. But Scala and Java
> >     analysis is completely different. So you cannot use a case class in
> >     Java
> >     API. Scala will fall back to Java though.
> >
> >     I hope this helps.
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 05.02.21 10:54, Xavier wrote:
> >      > Hi Utopia,
> >      >     Have u fixed this problem? I also meet this problem, so I
> >     transferred the
> >      > case class to Java POJO, then this problem was fixed.
> >      >
> >      >
> >      >
> >      > --
> >      > Sent from:
> >     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >     <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> >      >
> >
> >
> >
> > --
> >
> > Best Regards,
> > *Xavier*
>
>

-- 

Best Regards,
*Xavier*

Re: Question about Scala Case Class and List in Flink

Posted by Timo Walther <tw...@apache.org>.
Dealing with types is not always easy in Flink. If you have further 
issues, it might make sense to just pass them explicitly. We list all 
types in:

org.apache.flink.api.common.typeinfo.Types

org.apache.flink.api.scala.typeutils.Types

Regards,
Timo

On 05.02.21 16:04, Xavier wrote:
> Hi Timo,
>      Thank you for ur clarification, it is very useful to me, I am also 
> combining the realization of map function, trying to do implicit 
> conversion of case class, so that I can restore state from FS.
> 
> On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Xavier,
> 
>     the Scala API has special implicits in method such as
>     `DataStream.map()`
>     or `DataStream.keyBy()` to support Scala specifics like case classe.
>     For
>     Scala one needs to use the macro `createTypeInformation[CaseClass]` for
>     Java we use reflection via `TypeInformation.of()`. But Scala and Java
>     analysis is completely different. So you cannot use a case class in
>     Java
>     API. Scala will fall back to Java though.
> 
>     I hope this helps.
> 
>     Regards,
>     Timo
> 
> 
>     On 05.02.21 10:54, Xavier wrote:
>      > Hi Utopia,
>      >     Have u fixed this problem? I also meet this problem, so I
>     transferred the
>      > case class to Java POJO, then this problem was fixed.
>      >
>      >
>      >
>      > --
>      > Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>      >
> 
> 
> 
> -- 
> 
> Best Regards,
> *Xavier*


Re: Question about Scala Case Class and List in Flink

Posted by Xavier <xa...@gmail.com>.
Hi Timo,
    Thank you for ur clarification, it is very useful to me, I am also
combining the realization of map function, trying to do implicit conversion
of case class, so that I can restore state from FS.

On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <tw...@apache.org> wrote:

> Hi Xavier,
>
> the Scala API has special implicits in method such as `DataStream.map()`
> or `DataStream.keyBy()` to support Scala specifics like case classe. For
> Scala one needs to use the macro `createTypeInformation[CaseClass]` for
> Java we use reflection via `TypeInformation.of()`. But Scala and Java
> analysis is completely different. So you cannot use a case class in Java
> API. Scala will fall back to Java though.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 05.02.21 10:54, Xavier wrote:
> > Hi Utopia,
> >     Have u fixed this problem? I also meet this problem, so I
> transferred the
> > case class to Java POJO, then this problem was fixed.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>

-- 

Best Regards,
*Xavier*

Re: Question about Scala Case Class and List in Flink

Posted by Timo Walther <tw...@apache.org>.
Hi Xavier,

the Scala API has special implicits in method such as `DataStream.map()` 
or `DataStream.keyBy()` to support Scala specifics like case classe. For 
Scala one needs to use the macro `createTypeInformation[CaseClass]` for 
Java we use reflection via `TypeInformation.of()`. But Scala and Java 
analysis is completely different. So you cannot use a case class in Java 
API. Scala will fall back to Java though.

I hope this helps.

Regards,
Timo


On 05.02.21 10:54, Xavier wrote:
> Hi Utopia,
>     Have u fixed this problem? I also meet this problem, so I transferred the
> case class to Java POJO, then this problem was fixed.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 


Re: Question about Scala Case Class and List in Flink

Posted by Xavier <xa...@gmail.com>.
Hi Utopia,
   Have u fixed this problem? I also meet this problem, so I transferred the
case class to Java POJO, then this problem was fixed.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Question about Scala Case Class and List in Flink

Posted by Averell <lv...@gmail.com>.
Hi Timo,

This is my case class:
/case class Box[T](meta: Metadata, value: T) {
      def function1: A=>B = {...}
      def method2(...):A = {...}
}/

However, I still get that warning "/Class class data.package$Box cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance./"

I imported /org.apache.flink.streaming.api.scala._/ << is this enough to
tell that I am using Scala API?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Question about Scala Case Class and List in Flink

Posted by Timo Walther <tw...@apache.org>.
Hi,

Reg. 1:

Scala case classes are supported in the Scala specific version of the 
DataStream API. If you are using case classes in the Java API you will 
get the INFO below because the Java API uses pure reflection extraction 
for analyzing POJOs.

The Scala API tries to analyze Scala classes first, if this is not 
possible it will fallback to Java reflection extraction. So in your case 
the INFO should not be present because it is a pure Scala case class. Is 
it used within a non-case class?

Reg 2:

Most classes can be serialized by Flink. That's why the log lines are 
just of type INFO because they might affect the performance slightly. If 
you are performance sensitive. I would recommend primtive types, arrays 
and case classes.

Regards,
Timo


On 15.01.20 03:47, Utopia wrote:
> Hi folks,
> 
> I have two questions about types in Flink when using Scala:
> 
> *1. scala case class: *
> 
> This my case class define:
> 
> case class SensorReading(var id: String , var timestamp: Long, var 
> temperature: Double)
> 
> 
> In documentation, Scala case class is supported:
> `Scala /case classes/(including Scala tuples): null fields not supported`
> 
> But the log info shows:
> 10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
> class io.github.streamingwithflink.util.SensorReading is missing a 
> default constructor so it cannot be used as a POJO type and must be 
> processed as GenericType. Please read the Flink documentation on "Data 
> Types & Serialization" for details of the effect on performance.
> 
> 
> *2. scala list*
> *
> *
> This my case class define:
> 
> case class SensorReading(var id:String ,var timestamp: Long,var temperature: Double,var list:List[String] =List[String]())
> 
> log shows:
> No fields were detected for class scala.collection.immutable.List so it 
> cannot be used as a POJO type and must be processed as GenericType. 
> Please read the Flink documentation on "Data Types & Serialization" for 
> details of the effect on performance.
> 
> Does it means that scala list can be serialize that I can ignore this 
> info if I don’t care the performance.
> Should I use Java array list instead of scala list or create a custom 
> serializer for SensorReading case class?
> 
> Thanks!
> 
> 
> Best  regards
> Utopia