You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2020/03/07 01:17:05 UTC

Spark not able to read from an Embedded Kafka Topic

I am trying to write an integration test using Embedded Kafka but I keep
getting NullPointerException. My test case is very simple. It has following
steps:

   1. Read a JSON file & write messages to an inputTopic.
   2. Perform a 'readStream' operation.
   3. Do a 'select' on the Stream. This throws a NullPointerException.

What am I doing wrong? Code is given below:


"My Test which runs with Embedded Kafka" should "Generate correct Result" in {

    implicit val config: EmbeddedKafkaConfig =
      EmbeddedKafkaConfig(
        kafkaPort = 9066,
        zooKeeperPort = 2066,
        Map("log.dir" -> "./src/test/resources/")
      )

    withRunningKafka {
      createCustomTopic(inputTopic)
      val source = Source.fromFile("src/test/resources/test1.json")
      source.getLines.toList.filterNot(_.isEmpty).foreach(
        line => publishStringMessageToKafka(inputTopic, line)
      )
      source.close()
      implicit val deserializer: StringDeserializer = new StringDeserializer

      createCustomTopic(outputTopic)
      import spark2.implicits._

      val schema = spark.read.json("my.json").schema
      val myStream = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9066")
        .option("subscribe", inputTopic)
        .load()

      // Schema looks good
      myStream.printSchema()

      // Following line throws NULLPointerException! Why?
      val df = myStream.select(from_json($"value".cast("string"),
schema).alias("value"))

      // There's more code... but let's not worry about that for now.
    }

  }