You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Siddhesh Kalgaonkar <ka...@gmail.com> on 2022/01/04 18:35:06 UTC
Error while writing process functions
After a lot of struggle with the pure Jackson library which doesn't have a
strict mode within it due to which I wasn't able to validate the JSON
schema. I finally found one way of doing it but now I am not able to map
the correct *Success* and *Failure* messages in order to call the Process
Function.
Below is my code:
case class Premium(id: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String,Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
validationMessages.foreach(msg => println(msg.getMessage))
} match {
case Success(x) => {
println(" Good: " + x)
Right(x)
}
case Failure(err) => {
println("Bad: " + json)
Left(json)
}
}
}
override def processElement(i: String, context:
ProcessFunction[String, Premium]#Context, collector:
Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
}
}
}
}
Error:
type mismatch;
found : x.type (with underlying type Unit)
required: T
Right(x)
Re: Error while writing process functions
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
The last expression in your try block is validationMessages.foreach(msg =>
println(msg.getMessage)), which does not produce anything. You need to use
an expression that produces type T in your try block.
Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 02:35写道:
> After a lot of struggle with the pure Jackson library which doesn't have a
> strict mode within it due to which I wasn't able to validate the JSON
> schema. I finally found one way of doing it but now I am not able to map
> the correct *Success* and *Failure* messages in order to call the Process
> Function.
>
> Below is my code:
>
> case class Premium(id: String, premium: Long, eventTime: String)
>
> class Splitter extends ProcessFunction[String,Premium] {
> val outputTag = new OutputTag[String]("failed")
>
> def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
> Try {
> val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
> // You can read a JSON object from String, a file, URL, etc.
> val parsedJson = new ObjectMapper().readTree(sampleJsonString)
> val validationMessages = schema.validate(parsedJson).asScala
> validationMessages.foreach(msg => println(msg.getMessage))
> } match {
> case Success(x) => {
> println(" Good: " + x)
> Right(x)
> }
> case Failure(err) => {
> println("Bad: " + json)
> Left(json)
> }
> }
> }
>
> override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
> fromJson(i) match {
> case Right(data) => {
> collector.collect(data)
> println("Good Records: " + data)
> }
> case Left(json) => {
> context.output(outputTag, json)
> println("Bad Records: " + json)
> }
> }
> }
> }
>
> Error:
>
> type mismatch;
> found : x.type (with underlying type Unit)
> required: T
> Right(x)
>
>