You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ana Gómez González <an...@gmail.com> on 2023/03/06 20:57:46 UTC

Create generic DeserializationSchema (Scala)

Hello!

First time emailing one doubt to this mailing list, hope I'm not messing
anything up.
I'm not fully sure if what I want to do it's conceptually correct, so pls
let me know.

I want to create a generic class that extends a DeserializationSchema. I
want an easy way of creating different deserialization schemas for my
rabbitMQ sources from JSON to scala case classes.

My first approach looks like this:

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}

class GenericJsonSchema[T] extends DeserializationSchema[T] {

private val typeInformation: TypeInformation[T] = TypeInformation.of(new
TypeHint[T] {})
private val objectMapper: JsonMapper = JsonMapper.builder()
.addModule(DefaultScalaModule)
.build()

@throws(classOf[IOException])
def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
typeInformation.getTypeClass)

def isEndOfStream(nextElement: T): Boolean = false

def getProducedType: TypeInformation[T] = typeInformation
}


When running I obtain:


*Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
The TypeHint is using a generic variable.This is not supported, generic
types must be fully specified for the TypeHint.*

I've read and tried to understand all the problems when using generic types
and TypeInformation class, but I don't get the correct use or if it can be
used for my purpose.


Thanks a lot in advance


*Ana Gómez González*
<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>

Re: Create generic DeserializationSchema (Scala)

Posted by Ana Gómez González <an...@gmail.com>.
Thank you Alexey!
It worked perfectly. I was missing the ClassTag correct use.



*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


El lun, 6 mar 2023 a las 23:34, Alexey Novakov (<al...@ververica.com>)
escribió:

> Hi Ana,
>
> I think you will need to deal with ClassTag to keep all the code generic.
> I've found such example which should help:
>
>
> https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala
>
> object JsonDeserializationSchema {
>   private val objectMapper = JsonMapper.builder().addModule(DefaultScalaModule).build()}
>
>
> class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema
> [T] {
> override def deserialize(bytes: Array[Byte]): T =
> JsonDeserializationSchema.objectMapper.readValue[T](bytes, classTag[T].
> runtimeClass.asInstanceOf[Class[T]]) override def getProducedType:
> TypeInformation[T] = TypeExtractor.getForClass(classTag[T].runtimeClass.
> asInstanceOf[Class[T]])
>
>   ...
>
>
> }
>
> -----------
>
> Alexey
>
> On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <an...@gmail.com>
> wrote:
>
>>
>> Hello!
>>
>> First time emailing one doubt to this mailing list, hope I'm not messing
>> anything up.
>> I'm not fully sure if what I want to do it's conceptually correct, so pls
>> let me know.
>>
>> I want to create a generic class that extends a DeserializationSchema. I
>> want an easy way of creating different deserialization schemas for my
>> rabbitMQ sources from JSON to scala case classes.
>>
>> My first approach looks like this:
>>
>> import com.fasterxml.jackson.databind.json.JsonMapper
>> import com.fasterxml.jackson.module.scala.DefaultScalaModule
>> import org.apache.flink.api.common.serialization.DeserializationSchema
>> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
>>
>> class GenericJsonSchema[T] extends DeserializationSchema[T] {
>>
>> private val typeInformation: TypeInformation[T] = TypeInformation.of(new
>> TypeHint[T] {})
>> private val objectMapper: JsonMapper = JsonMapper.builder()
>> .addModule(DefaultScalaModule)
>> .build()
>>
>> @throws(classOf[IOException])
>> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
>> typeInformation.getTypeClass)
>>
>> def isEndOfStream(nextElement: T): Boolean = false
>>
>> def getProducedType: TypeInformation[T] = typeInformation
>> }
>>
>>
>> When running I obtain:
>>
>>
>> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
>> The TypeHint is using a generic variable.This is not supported, generic
>> types must be fully specified for the TypeHint.*
>>
>> I've read and tried to understand all the problems when using generic
>> types and TypeInformation class, but I don't get the correct use or if it
>> can be used for my purpose.
>>
>>
>> Thanks a lot in advance
>>
>>
>> *Ana Gómez González*
>> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>
>>
>

Re: Create generic DeserializationSchema (Scala)

Posted by Alexey Novakov via user <us...@flink.apache.org>.
Hi Ana,

I think you will need to deal with ClassTag to keep all the code generic.
I've found such example which should help:

https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala

object JsonDeserializationSchema {
  private val objectMapper =
JsonMapper.builder().addModule(DefaultScalaModule).build()}


class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema[T
] {
override def deserialize(bytes: Array[Byte]): T = JsonDeserializationSchema.
objectMapper.readValue[T](bytes, classTag[T].runtimeClass.asInstanceOf[Class
[T]]) override def getProducedType: TypeInformation[T] = TypeExtractor.
getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]])

  ...


}

-----------

Alexey

On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <an...@gmail.com>
wrote:

>
> Hello!
>
> First time emailing one doubt to this mailing list, hope I'm not messing
> anything up.
> I'm not fully sure if what I want to do it's conceptually correct, so pls
> let me know.
>
> I want to create a generic class that extends a DeserializationSchema. I
> want an easy way of creating different deserialization schemas for my
> rabbitMQ sources from JSON to scala case classes.
>
> My first approach looks like this:
>
> import com.fasterxml.jackson.databind.json.JsonMapper
> import com.fasterxml.jackson.module.scala.DefaultScalaModule
> import org.apache.flink.api.common.serialization.DeserializationSchema
> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
>
> class GenericJsonSchema[T] extends DeserializationSchema[T] {
>
> private val typeInformation: TypeInformation[T] = TypeInformation.of(new
> TypeHint[T] {})
> private val objectMapper: JsonMapper = JsonMapper.builder()
> .addModule(DefaultScalaModule)
> .build()
>
> @throws(classOf[IOException])
> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
> typeInformation.getTypeClass)
>
> def isEndOfStream(nextElement: T): Boolean = false
>
> def getProducedType: TypeInformation[T] = typeInformation
> }
>
>
> When running I obtain:
>
>
> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> The TypeHint is using a generic variable.This is not supported, generic
> types must be fully specified for the TypeHint.*
>
> I've read and tried to understand all the problems when using generic
> types and TypeInformation class, but I don't get the correct use or if it
> can be used for my purpose.
>
>
> Thanks a lot in advance
>
>
> *Ana Gómez González*
> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>
>