You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ASK5 (Jira)" <ji...@apache.org> on 2019/10/05 06:14:00 UTC

[jira] [Created] (FLINK-14327) Getting "Could not forward element to next operator" error

ASK5 created FLINK-14327:
----------------------------

             Summary: Getting "Could not forward element to next operator" error
                 Key: FLINK-14327
                 URL: https://issues.apache.org/jira/browse/FLINK-14327
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.9.0
            Reporter: ASK5
             Fix For: 1.9.0


val TEMPERATURE_THRESHOLD: Double = 50.00

val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("bootstrap.servers", "localhost:9092")

val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
 new JSONKeyValueDeserializationSchema(false), properties)).name("kafkaSource")
case class Event(locationID: String, temp: Double)

var data = src.map { v => {
 val loc = v.get("locationID").asInstanceOf[String]
 val temperature = v.get("temp").asDouble()
 (loc, temperature)
}}


data = data
 .keyBy(
 v => v._1
 )

data.print()

see.execute()

and I'm getting the following error while consuming json file from Kafka:-


 
{{Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed....    at flinkBroadcast1$.main(flinkBroadcast1.scala:59)    at flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: java.lang.NullPointerException}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)