You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Siddhesh Kalgaonkar <ka...@gmail.com> on 2022/01/04 18:35:06 UTC

Error while writing process functions

After a lot of struggle with the pure Jackson library which doesn't have a
strict mode within it due to which I wasn't able to validate the JSON
schema. I finally found one way of doing it but now I am not able to map
the correct *Success* and *Failure* messages in order to call the Process
Function.

Below is my code:

case class Premium(id: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String,Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(sampleJsonString)
      val validationMessages = schema.validate(parsedJson).asScala
      validationMessages.foreach(msg => println(msg.getMessage))
    } match {
      case Success(x) => {
        println(" Good: " + x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  " + json)
        Left(json)
      }
    }
  }

  override def processElement(i: String, context:
ProcessFunction[String, Premium]#Context, collector:
Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: " + data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: " + json)
      }
    }
  }
}

Error:

type mismatch;
 found   : x.type (with underlying type Unit)
 required: T
        Right(x)

Re: Error while writing process functions

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

The last expression in your try block is validationMessages.foreach(msg =>
println(msg.getMessage)), which does not produce anything. You need to use
an expression that produces type T in your try block.

Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 02:35写道:

> After a lot of struggle with the pure Jackson library which doesn't have a
> strict mode within it due to which I wasn't able to validate the JSON
> schema. I finally found one way of doing it but now I am not able to map
> the correct *Success* and *Failure* messages in order to call the Process
> Function.
>
> Below is my code:
>
> case class Premium(id: String, premium: Long, eventTime: String)
>
> class Splitter extends ProcessFunction[String,Premium] {
>   val outputTag = new OutputTag[String]("failed")
>
>   def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
>     Try {
>       val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>       // You can read a JSON object from String, a file, URL, etc.
>       val parsedJson = new ObjectMapper().readTree(sampleJsonString)
>       val validationMessages = schema.validate(parsedJson).asScala
>       validationMessages.foreach(msg => println(msg.getMessage))
>     } match {
>       case Success(x) => {
>         println(" Good: " + x)
>         Right(x)
>       }
>       case Failure(err) => {
>         println("Bad:  " + json)
>         Left(json)
>       }
>     }
>   }
>
>   override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
>     fromJson(i) match {
>       case Right(data) => {
>         collector.collect(data)
>         println("Good Records: " + data)
>       }
>       case Left(json) => {
>         context.output(outputTag, json)
>         println("Bad Records: " + json)
>       }
>     }
>   }
> }
>
> Error:
>
> type mismatch;
>  found   : x.type (with underlying type Unit)
>  required: T
>         Right(x)
>
>