You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2020/03/07 01:17:05 UTC
Spark not able to read from an Embedded Kafka Topic
I am trying to write an integration test using Embedded Kafka but I keep
getting NullPointerException. My test case is very simple. It has following
steps:
1. Read a JSON file & write messages to an inputTopic.
2. Perform a 'readStream' operation.
3. Do a 'select' on the Stream. This throws a NullPointerException.
What am I doing wrong? Code is given below:
"My Test which runs with Embedded Kafka" should "Generate correct Result" in {
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(
kafkaPort = 9066,
zooKeeperPort = 2066,
Map("log.dir" -> "./src/test/resources/")
)
withRunningKafka {
createCustomTopic(inputTopic)
val source = Source.fromFile("src/test/resources/test1.json")
source.getLines.toList.filterNot(_.isEmpty).foreach(
line => publishStringMessageToKafka(inputTopic, line)
)
source.close()
implicit val deserializer: StringDeserializer = new StringDeserializer
createCustomTopic(outputTopic)
import spark2.implicits._
val schema = spark.read.json("my.json").schema
val myStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9066")
.option("subscribe", inputTopic)
.load()
// Schema looks good
myStream.printSchema()
// Following line throws NULLPointerException! Why?
val df = myStream.select(from_json($"value".cast("string"),
schema).alias("value"))
// There's more code... but let's not worry about that for now.
}
}