You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Siddhesh Kalgaonkar <ka...@gmail.com> on 2022/01/05 11:04:32 UTC
Passing msg and record to the process function
I have written a process function where I am parsing the JSON and if it is
not according to the expected format it passes as Failure to the process
function and I print the records which are working fine. Now, I was trying
to print the message and the record in case of Success and Failure. I
implemented the below code and it gave me the error. What exactly I am
missing?
package KafkaAsSource
import com.fasterxml.jackson.databind.ObjectMapper
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
class JSONParsingProcessFunction extends ProcessFunction[String,String] {
val outputTag = new OutputTag[String]("failed")
def parseJson(json: String): Either[String, String] = {
val schemaJsonString =
"""
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Product",
"description": "A product from the catalog",
"type": "object",
"properties": {
"id": {
"description": "The unique identifier for a product",
"type": "integer"
},
"premium": {
"description": "Annual Premium",
"type": "integer"
},
"eventTime": {
"description": "Timestamp at which record has arrived at
source / generated",
"type": "string"
}
},
"required": ["id", "premium","eventTime"]
}
"""
Try {
val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(json)
val validationMessages = schema.validate(parsedJson).asScala
//validationMessages.foreach(msg => println(msg.getMessage))
require(validationMessages.isEmpty)
//parsedJson.toString()
if(validationMessages.isEmpty)
{
(parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
}
else
{
(parsedJson.toString(),"Format is correct...")
}
}
match {
case Success(x) => {
println("Good: " + x)
Right(x)
}
case Failure(err) => {
println("Bad: " + json)
Left(json)
}
}
}
override def processElement(i: String, context:
ProcessFunction[String, String]#Context, collector:
Collector[String]): Unit = {
parseJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
}
}
}
}
Error:
type mismatch;
found : (String, Any)
required: String
Right(x)
Re: Passing msg and record to the process function
Posted by Siddhesh Kalgaonkar <ka...@gmail.com>.
I was able to modify the code and get the tuple in case of Success. How do
I pass the tuple to the Failure part?
try
{
//
//some processing
if (!validationMessages.isEmpty) {
(parsedJson.toString(), validationMessages.foreach(x => {
val msg: String = x.getMessage
msg
}).toString())
}
else {
(parsedJson.toString(), "Good Record...")
}
}
match {
case Success(x) => {
Right(x)
}
case Failure(err) => {
Left(json)
}
}
On Thu, Jan 6, 2022 at 1:43 PM Siddhesh Kalgaonkar <
kalgaonkarsiddhesh@gmail.com> wrote:
> Thanks, Caizhi for your explanation. It helped me to understand where I
> went wrong.
>
> On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> The last expression in your try block is
>>
>> if(validationMessages.isEmpty) {
>> (parsedJson.toString(),
>> validationMessages.foreach((msg=>msg.getMessage.toString)))
>> } else {
>> (parsedJson.toString(), "Format is correct...")
>> }
>>
>> The first one produces a (String, Unit) type while the second one
>> produces a (String, String) type, so the whole if expression produces
>> (String, Any) type. However your parseJson should return Either[String,
>> String], thus causing this issue.
>>
>>
>> Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:
>>
>>> I have written a process function where I am parsing the JSON and if it
>>> is not according to the expected format it passes as Failure to the process
>>> function and I print the records which are working fine. Now, I was trying
>>> to print the message and the record in case of Success and Failure. I
>>> implemented the below code and it gave me the error. What exactly I am
>>> missing?
>>>
>>> package KafkaAsSource
>>>
>>> import com.fasterxml.jackson.databind.ObjectMapper
>>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>>> import org.apache.flink.api.scala.createTypeInformation
>>> import org.apache.flink.streaming.api.functions.ProcessFunction
>>> import org.apache.flink.streaming.api.scala.OutputTag
>>> import org.apache.flink.util.Collector
>>> import scala.jdk.CollectionConverters._
>>> import scala.util.{Failure, Success, Try}
>>>
>>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>> val outputTag = new OutputTag[String]("failed")
>>>
>>> def parseJson(json: String): Either[String, String] = {
>>> val schemaJsonString =
>>> """
>>> {
>>> "$schema": "http://json-schema.org/draft-04/schema#",
>>> "title": "Product",
>>> "description": "A product from the catalog",
>>> "type": "object",
>>> "properties": {
>>> "id": {
>>> "description": "The unique identifier for a product",
>>> "type": "integer"
>>> },
>>> "premium": {
>>> "description": "Annual Premium",
>>> "type": "integer"
>>> },
>>> "eventTime": {
>>> "description": "Timestamp at which record has arrived at source / generated",
>>> "type": "string"
>>> }
>>> },
>>> "required": ["id", "premium","eventTime"]
>>> }
>>> """
>>> Try {
>>> val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>> // You can read a JSON object from String, a file, URL, etc.
>>> val parsedJson = new ObjectMapper().readTree(json)
>>> val validationMessages = schema.validate(parsedJson).asScala
>>> //validationMessages.foreach(msg => println(msg.getMessage))
>>> require(validationMessages.isEmpty)
>>> //parsedJson.toString()
>>> if(validationMessages.isEmpty)
>>> {
>>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>> }
>>> else
>>> {
>>> (parsedJson.toString(),"Format is correct...")
>>> }
>>>
>>> }
>>> match {
>>> case Success(x) => {
>>> println("Good: " + x)
>>> Right(x)
>>> }
>>> case Failure(err) => {
>>> println("Bad: " + json)
>>> Left(json)
>>> }
>>> }
>>> }
>>> override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
>>> parseJson(i) match {
>>> case Right(data) => {
>>> collector.collect(data)
>>> println("Good Records: " + data)
>>> }
>>> case Left(json) => {
>>> context.output(outputTag, json)
>>> println("Bad Records: " + json)
>>> }
>>> }
>>> }
>>> }
>>>
>>>
>>> Error:
>>>
>>> type mismatch;
>>> found : (String, Any)
>>> required: String
>>> Right(x)
>>>
>>>
Re: Passing msg and record to the process function
Posted by Siddhesh Kalgaonkar <ka...@gmail.com>.
Thanks, Caizhi for your explanation. It helped me to understand where I
went wrong.
On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <ts...@gmail.com> wrote:
> Hi!
>
> The last expression in your try block is
>
> if(validationMessages.isEmpty) {
> (parsedJson.toString(),
> validationMessages.foreach((msg=>msg.getMessage.toString)))
> } else {
> (parsedJson.toString(), "Format is correct...")
> }
>
> The first one produces a (String, Unit) type while the second one produces
> a (String, String) type, so the whole if expression produces (String, Any)
> type. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as Failure to the process
>> function and I print the records which are working fine. Now, I was trying
>> to print the message and the record in case of Success and Failure. I
>> implemented the below code and it gave me the error. What exactly I am
>> missing?
>>
>> package KafkaAsSource
>>
>> import com.fasterxml.jackson.databind.ObjectMapper
>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>> import org.apache.flink.api.scala.createTypeInformation
>> import org.apache.flink.streaming.api.functions.ProcessFunction
>> import org.apache.flink.streaming.api.scala.OutputTag
>> import org.apache.flink.util.Collector
>> import scala.jdk.CollectionConverters._
>> import scala.util.{Failure, Success, Try}
>>
>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>> val outputTag = new OutputTag[String]("failed")
>>
>> def parseJson(json: String): Either[String, String] = {
>> val schemaJsonString =
>> """
>> {
>> "$schema": "http://json-schema.org/draft-04/schema#",
>> "title": "Product",
>> "description": "A product from the catalog",
>> "type": "object",
>> "properties": {
>> "id": {
>> "description": "The unique identifier for a product",
>> "type": "integer"
>> },
>> "premium": {
>> "description": "Annual Premium",
>> "type": "integer"
>> },
>> "eventTime": {
>> "description": "Timestamp at which record has arrived at source / generated",
>> "type": "string"
>> }
>> },
>> "required": ["id", "premium","eventTime"]
>> }
>> """
>> Try {
>> val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>> // You can read a JSON object from String, a file, URL, etc.
>> val parsedJson = new ObjectMapper().readTree(json)
>> val validationMessages = schema.validate(parsedJson).asScala
>> //validationMessages.foreach(msg => println(msg.getMessage))
>> require(validationMessages.isEmpty)
>> //parsedJson.toString()
>> if(validationMessages.isEmpty)
>> {
>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>> }
>> else
>> {
>> (parsedJson.toString(),"Format is correct...")
>> }
>>
>> }
>> match {
>> case Success(x) => {
>> println("Good: " + x)
>> Right(x)
>> }
>> case Failure(err) => {
>> println("Bad: " + json)
>> Left(json)
>> }
>> }
>> }
>> override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
>> parseJson(i) match {
>> case Right(data) => {
>> collector.collect(data)
>> println("Good Records: " + data)
>> }
>> case Left(json) => {
>> context.output(outputTag, json)
>> println("Bad Records: " + json)
>> }
>> }
>> }
>> }
>>
>>
>> Error:
>>
>> type mismatch;
>> found : (String, Any)
>> required: String
>> Right(x)
>>
>>
Re: Passing msg and record to the process function
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
The last expression in your try block is
if(validationMessages.isEmpty) {
(parsedJson.toString(),
validationMessages.foreach((msg=>msg.getMessage.toString)))
} else {
(parsedJson.toString(), "Format is correct...")
}
The first one produces a (String, Unit) type while the second one produces
a (String, String) type, so the whole if expression produces (String, Any)
type. However your parseJson should return Either[String, String], thus
causing this issue.
Siddhesh Kalgaonkar <ka...@gmail.com> 于2022年1月5日周三 19:04写道:
> I have written a process function where I am parsing the JSON and if it is
> not according to the expected format it passes as Failure to the process
> function and I print the records which are working fine. Now, I was trying
> to print the message and the record in case of Success and Failure. I
> implemented the below code and it gave me the error. What exactly I am
> missing?
>
> package KafkaAsSource
>
> import com.fasterxml.jackson.databind.ObjectMapper
> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.streaming.api.scala.OutputTag
> import org.apache.flink.util.Collector
> import scala.jdk.CollectionConverters._
> import scala.util.{Failure, Success, Try}
>
> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
> val outputTag = new OutputTag[String]("failed")
>
> def parseJson(json: String): Either[String, String] = {
> val schemaJsonString =
> """
> {
> "$schema": "http://json-schema.org/draft-04/schema#",
> "title": "Product",
> "description": "A product from the catalog",
> "type": "object",
> "properties": {
> "id": {
> "description": "The unique identifier for a product",
> "type": "integer"
> },
> "premium": {
> "description": "Annual Premium",
> "type": "integer"
> },
> "eventTime": {
> "description": "Timestamp at which record has arrived at source / generated",
> "type": "string"
> }
> },
> "required": ["id", "premium","eventTime"]
> }
> """
> Try {
> val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
> // You can read a JSON object from String, a file, URL, etc.
> val parsedJson = new ObjectMapper().readTree(json)
> val validationMessages = schema.validate(parsedJson).asScala
> //validationMessages.foreach(msg => println(msg.getMessage))
> require(validationMessages.isEmpty)
> //parsedJson.toString()
> if(validationMessages.isEmpty)
> {
> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
> }
> else
> {
> (parsedJson.toString(),"Format is correct...")
> }
>
> }
> match {
> case Success(x) => {
> println("Good: " + x)
> Right(x)
> }
> case Failure(err) => {
> println("Bad: " + json)
> Left(json)
> }
> }
> }
> override def processElement(i: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
> parseJson(i) match {
> case Right(data) => {
> collector.collect(data)
> println("Good Records: " + data)
> }
> case Left(json) => {
> context.output(outputTag, json)
> println("Bad Records: " + json)
> }
> }
> }
> }
>
>
> Error:
>
> type mismatch;
> found : (String, Any)
> required: String
> Right(x)
>
>