You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Siddhesh Kalgaonkar <ka...@gmail.com> on 2022/01/05 11:04:32 UTC

Passing msg and record to the process function

I have written a process function where I am parsing the JSON and if it is
not according to the expected format it passes as Failure to the process
function and I print the records which are working fine. Now, I was trying
to print the message and the record in case of Success and Failure. I
implemented the below code and it gave me the error. What exactly I am
missing?

package KafkaAsSource

import com.fasterxml.jackson.databind.ObjectMapper
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class JSONParsingProcessFunction extends ProcessFunction[String,String] {
  val outputTag = new OutputTag[String]("failed")

  def parseJson(json: String): Either[String, String] = {
    val schemaJsonString =
      """
{
    "$schema": "http://json-schema.org/draft-04/schema#",
    "title": "Product",
    "description": "A product from the catalog",
    "type": "object",
    "properties": {
        "id": {
            "description": "The unique identifier for a product",
            "type": "integer"
        },
        "premium": {
            "description": "Annual Premium",
            "type": "integer"
        },
        "eventTime": {
            "description": "Timestamp at which record has arrived at
source / generated",
            "type": "string"
        }
    },
    "required": ["id", "premium","eventTime"]
}
"""
    Try {
      val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(json)
      val validationMessages = schema.validate(parsedJson).asScala
      //validationMessages.foreach(msg => println(msg.getMessage))
      require(validationMessages.isEmpty)
      //parsedJson.toString()
      if(validationMessages.isEmpty)
        {
          (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
        }
      else
        {
          (parsedJson.toString(),"Format is correct...")
        }

    }
    match {
      case Success(x) => {
        println("Good: " + x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  " + json)
        Left(json)
      }
    }
  }
  override def processElement(i: String, context:
ProcessFunction[String, String]#Context, collector:
Collector[String]): Unit = {
    parseJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: " + data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: " + json)
      }
    }
  }
}


Error:

type mismatch;
 found   : (String, Any)
 required: String
        Right(x)

Re: Passing msg and record to the process function

Posted by Siddhesh Kalgaonkar <ka...@gmail.com>.
I was able to modify the code and get the tuple in case of Success. How do
I pass the tuple to the Failure part?

try
{
  //
  //some processing

if (!validationMessages.isEmpty) {
        (parsedJson.toString(), validationMessages.foreach(x => {
          val msg: String = x.getMessage
          msg
        }).toString())
      }
      else {
        (parsedJson.toString(), "Good Record...")
      }

    }
    match {
      case Success(x) => {
        Right(x)
      }
      case Failure(err) => {
        Left(json)
      }
    }


On Thu, Jan 6, 2022 at 1:43 PM Siddhesh Kalgaonkar <
kalgaonkarsiddhesh@gmail.com> wrote:

> Thanks, Caizhi for your explanation. It helped me to understand where I
> went wrong.
>
> On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> The last expression in your try block is
>>
>> if(validationMessages.isEmpty) {
>>   (parsedJson.toString(),
>> validationMessages.foreach((msg=>msg.getMessage.toString)))
>> } else {
>>   (parsedJson.toString(), "Format is correct...")
>> }
>>
>> The first one produces a (String, Unit) type while the second one
>> produces a (String, String) type, so the whole if expression produces
>> (String, Any) type. However your parseJson should return Either[String,
>> String], thus causing this issue.
>>
>>
>> Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:
>>
>>> I have written a process function where I am parsing the JSON and if it
>>> is not according to the expected format it passes as Failure to the process
>>> function and I print the records which are working fine. Now, I was trying
>>> to print the message and the record in case of Success and Failure. I
>>> implemented the below code and it gave me the error. What exactly I am
>>> missing?
>>>
>>> package KafkaAsSource
>>>
>>> import com.fasterxml.jackson.databind.ObjectMapper
>>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>>> import org.apache.flink.api.scala.createTypeInformation
>>> import org.apache.flink.streaming.api.functions.ProcessFunction
>>> import org.apache.flink.streaming.api.scala.OutputTag
>>> import org.apache.flink.util.Collector
>>> import scala.jdk.CollectionConverters._
>>> import scala.util.{Failure, Success, Try}
>>>
>>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>>   val outputTag = new OutputTag[String]("failed")
>>>
>>>   def parseJson(json: String): Either[String, String] = {
>>>     val schemaJsonString =
>>>       """
>>> {
>>>     "$schema": "http://json-schema.org/draft-04/schema#",
>>>     "title": "Product",
>>>     "description": "A product from the catalog",
>>>     "type": "object",
>>>     "properties": {
>>>         "id": {
>>>             "description": "The unique identifier for a product",
>>>             "type": "integer"
>>>         },
>>>         "premium": {
>>>             "description": "Annual Premium",
>>>             "type": "integer"
>>>         },
>>>         "eventTime": {
>>>             "description": "Timestamp at which record has arrived at source / generated",
>>>             "type": "string"
>>>         }
>>>     },
>>>     "required": ["id", "premium","eventTime"]
>>> }
>>> """
>>>     Try {
>>>       val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>>       // You can read a JSON object from String, a file, URL, etc.
>>>       val parsedJson = new ObjectMapper().readTree(json)
>>>       val validationMessages = schema.validate(parsedJson).asScala
>>>       //validationMessages.foreach(msg => println(msg.getMessage))
>>>       require(validationMessages.isEmpty)
>>>       //parsedJson.toString()
>>>       if(validationMessages.isEmpty)
>>>         {
>>>           (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>>         }
>>>       else
>>>         {
>>>           (parsedJson.toString(),"Format is correct...")
>>>         }
>>>
>>>     }
>>>     match {
>>>       case Success(x) => {
>>>         println("Good: " + x)
>>>         Right(x)
>>>       }
>>>       case Failure(err) => {
>>>         println("Bad:  " + json)
>>>         Left(json)
>>>       }
>>>     }
>>>   }
>>>   override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
>>>     parseJson(i) match {
>>>       case Right(data) => {
>>>         collector.collect(data)
>>>         println("Good Records: " + data)
>>>       }
>>>       case Left(json) => {
>>>         context.output(outputTag, json)
>>>         println("Bad Records: " + json)
>>>       }
>>>     }
>>>   }
>>> }
>>>
>>>
>>> Error:
>>>
>>> type mismatch;
>>>  found   : (String, Any)
>>>  required: String
>>>         Right(x)
>>>
>>>

Re: Passing msg and record to the process function

Posted by Siddhesh Kalgaonkar <ka...@gmail.com>.
Thanks, Caizhi for your explanation. It helped me to understand where I
went wrong.

On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> The last expression in your try block is
>
> if(validationMessages.isEmpty) {
>   (parsedJson.toString(),
> validationMessages.foreach((msg=>msg.getMessage.toString)))
> } else {
>   (parsedJson.toString(), "Format is correct...")
> }
>
> The first one produces a (String, Unit) type while the second one produces
> a (String, String) type, so the whole if expression produces (String, Any)
> type. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as Failure to the process
>> function and I print the records which are working fine. Now, I was trying
>> to print the message and the record in case of Success and Failure. I
>> implemented the below code and it gave me the error. What exactly I am
>> missing?
>>
>> package KafkaAsSource
>>
>> import com.fasterxml.jackson.databind.ObjectMapper
>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>> import org.apache.flink.api.scala.createTypeInformation
>> import org.apache.flink.streaming.api.functions.ProcessFunction
>> import org.apache.flink.streaming.api.scala.OutputTag
>> import org.apache.flink.util.Collector
>> import scala.jdk.CollectionConverters._
>> import scala.util.{Failure, Success, Try}
>>
>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>   val outputTag = new OutputTag[String]("failed")
>>
>>   def parseJson(json: String): Either[String, String] = {
>>     val schemaJsonString =
>>       """
>> {
>>     "$schema": "http://json-schema.org/draft-04/schema#",
>>     "title": "Product",
>>     "description": "A product from the catalog",
>>     "type": "object",
>>     "properties": {
>>         "id": {
>>             "description": "The unique identifier for a product",
>>             "type": "integer"
>>         },
>>         "premium": {
>>             "description": "Annual Premium",
>>             "type": "integer"
>>         },
>>         "eventTime": {
>>             "description": "Timestamp at which record has arrived at source / generated",
>>             "type": "string"
>>         }
>>     },
>>     "required": ["id", "premium","eventTime"]
>> }
>> """
>>     Try {
>>       val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>       // You can read a JSON object from String, a file, URL, etc.
>>       val parsedJson = new ObjectMapper().readTree(json)
>>       val validationMessages = schema.validate(parsedJson).asScala
>>       //validationMessages.foreach(msg => println(msg.getMessage))
>>       require(validationMessages.isEmpty)
>>       //parsedJson.toString()
>>       if(validationMessages.isEmpty)
>>         {
>>           (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>         }
>>       else
>>         {
>>           (parsedJson.toString(),"Format is correct...")
>>         }
>>
>>     }
>>     match {
>>       case Success(x) => {
>>         println("Good: " + x)
>>         Right(x)
>>       }
>>       case Failure(err) => {
>>         println("Bad:  " + json)
>>         Left(json)
>>       }
>>     }
>>   }
>>   override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
>>     parseJson(i) match {
>>       case Right(data) => {
>>         collector.collect(data)
>>         println("Good Records: " + data)
>>       }
>>       case Left(json) => {
>>         context.output(outputTag, json)
>>         println("Bad Records: " + json)
>>       }
>>     }
>>   }
>> }
>>
>>
>> Error:
>>
>> type mismatch;
>>  found   : (String, Any)
>>  required: String
>>         Right(x)
>>
>>

Re: Passing msg and record to the process function

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

The last expression in your try block is

if(validationMessages.isEmpty) {
  (parsedJson.toString(),
validationMessages.foreach((msg=>msg.getMessage.toString)))
} else {
  (parsedJson.toString(), "Format is correct...")
}

The first one produces a (String, Unit) type while the second one produces
a (String, String) type, so the whole if expression produces (String, Any)
type. However your parseJson should return Either[String, String], thus
causing this issue.


Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:

> I have written a process function where I am parsing the JSON and if it is
> not according to the expected format it passes as Failure to the process
> function and I print the records which are working fine. Now, I was trying
> to print the message and the record in case of Success and Failure. I
> implemented the below code and it gave me the error. What exactly I am
> missing?
>
> package KafkaAsSource
>
> import com.fasterxml.jackson.databind.ObjectMapper
> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.streaming.api.scala.OutputTag
> import org.apache.flink.util.Collector
> import scala.jdk.CollectionConverters._
> import scala.util.{Failure, Success, Try}
>
> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>   val outputTag = new OutputTag[String]("failed")
>
>   def parseJson(json: String): Either[String, String] = {
>     val schemaJsonString =
>       """
> {
>     "$schema": "http://json-schema.org/draft-04/schema#",
>     "title": "Product",
>     "description": "A product from the catalog",
>     "type": "object",
>     "properties": {
>         "id": {
>             "description": "The unique identifier for a product",
>             "type": "integer"
>         },
>         "premium": {
>             "description": "Annual Premium",
>             "type": "integer"
>         },
>         "eventTime": {
>             "description": "Timestamp at which record has arrived at source / generated",
>             "type": "string"
>         }
>     },
>     "required": ["id", "premium","eventTime"]
> }
> """
>     Try {
>       val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>       // You can read a JSON object from String, a file, URL, etc.
>       val parsedJson = new ObjectMapper().readTree(json)
>       val validationMessages = schema.validate(parsedJson).asScala
>       //validationMessages.foreach(msg => println(msg.getMessage))
>       require(validationMessages.isEmpty)
>       //parsedJson.toString()
>       if(validationMessages.isEmpty)
>         {
>           (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>         }
>       else
>         {
>           (parsedJson.toString(),"Format is correct...")
>         }
>
>     }
>     match {
>       case Success(x) => {
>         println("Good: " + x)
>         Right(x)
>       }
>       case Failure(err) => {
>         println("Bad:  " + json)
>         Left(json)
>       }
>     }
>   }
>   override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
>     parseJson(i) match {
>       case Right(data) => {
>         collector.collect(data)
>         println("Good Records: " + data)
>       }
>       case Left(json) => {
>         context.output(outputTag, json)
>         println("Bad Records: " + json)
>       }
>     }
>   }
> }
>
>
> Error:
>
> type mismatch;
>  found   : (String, Any)
>  required: String
>         Right(x)
>
>