You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Georg Heiler <ge...@gmail.com> on 2020/07/09 20:44:21 UTC

MalformedClassName for scala case class

Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

case class Foo(lang: String, count: Int)
val r = stream
    .map(e => {
      Foo(e.get("value").get("lang").asText(), 1)
    })
    .keyBy(_.lang)
    .timeWindow(Time.seconds(10))
    .sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg

Re: MalformedClassName for scala case class

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

could you please post the stacktrace with the exception and also let us 
know which Flink version you're using?

I have tried the following code and it works on 
master/flink-1.11/flink-1.10:

   case class Foo(lang: String, count: Int)
   def main(args: Array[String]): Unit = {
     val senv = StreamExecutionEnvironment.getExecutionEnvironment
     val stenv = StreamTableEnvironment.create(senv)

     val source = senv.fromElements("Hello", "ciao")
     val mapped = source.map( e => {Foo(e, 13) } )
     stenv.registerDataStream("foo", mapped)

     senv.execute()
   }

It's not exactly your code but pretty similar and I use the same case class.

Best,
Aljoscha

On 09.07.20 22:44, Georg Heiler wrote:
> Hi,
> 
> why can't I register the stream as a table and get a MalformedClassName
> exception?
> 
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
>      new FlinkKafkaConsumer(
>        "tweets-raw-json",
>        serializer,
>        properties
>      ).setStartFromEarliest() // TODO experiment with different start values
>    )
> 
> case class Foo(lang: String, count: Int)
> val r = stream
>      .map(e => {
>        Foo(e.get("value").get("lang").asText(), 1)
>      })
>      .keyBy(_.lang)
>      .timeWindow(Time.seconds(10))
>      .sum("count")
> r.print()
> stenv.registerDataStream("tweets_json", r)
> 
> Best,
> Georg
>