You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Georg Heiler <ge...@gmail.com> on 2020/07/09 20:44:21 UTC
MalformedClassName for scala case class
Hi,
why can't I register the stream as a table and get a MalformedClassName
exception?
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw-json",
serializer,
properties
).setStartFromEarliest() // TODO experiment with different start values
)
case class Foo(lang: String, count: Int)
val r = stream
.map(e => {
Foo(e.get("value").get("lang").asText(), 1)
})
.keyBy(_.lang)
.timeWindow(Time.seconds(10))
.sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)
Best,
Georg
Re: MalformedClassName for scala case class
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
could you please post the stacktrace with the exception and also let us
know which Flink version you're using?
I have tried the following code and it works on
master/flink-1.11/flink-1.10:
case class Foo(lang: String, count: Int)
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val stenv = StreamTableEnvironment.create(senv)
val source = senv.fromElements("Hello", "ciao")
val mapped = source.map( e => {Foo(e, 13) } )
stenv.registerDataStream("foo", mapped)
senv.execute()
}
It's not exactly your code but pretty similar and I use the same case class.
Best,
Aljoscha
On 09.07.20 22:44, Georg Heiler wrote:
> Hi,
>
> why can't I register the stream as a table and get a MalformedClassName
> exception?
>
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
> new FlinkKafkaConsumer(
> "tweets-raw-json",
> serializer,
> properties
> ).setStartFromEarliest() // TODO experiment with different start values
> )
>
> case class Foo(lang: String, count: Int)
> val r = stream
> .map(e => {
> Foo(e.get("value").get("lang").asText(), 1)
> })
> .keyBy(_.lang)
> .timeWindow(Time.seconds(10))
> .sum("count")
> r.print()
> stenv.registerDataStream("tweets_json", r)
>
> Best,
> Georg
>