You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Revin Chalil <rc...@expedia.com> on 2017/05/11 06:21:25 UTC

Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

I am trying to convert avro records with field type = bytes to json string using Structured Streaming in Spark 2.1. Please see below.


object AvroConvert {

  case class KafkaMessage(
                           payload: String
                         )

  val schemaString =    """{
                            "type" : "record",
                            "name" : "HdfsEvent",
                            "namespace" : "com.abc.def.domain.hdfs",
                            "fields" : [ {
                              "name" : "payload",
                              "type" : {
                                "type" : "bytes",
                                "java-class" : "[B"
                              }
                            } ]
                          }"""
  val messageSchema = new Schema.Parser().parse(schemaString)
  val reader = new GenericDatumReader[GenericRecord](messageSchema)
  // Binary decoder
  val decoder = DecoderFactory.get()
  // Register implicit encoder for map operation
  implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

  def main(args: Array[String]) {

    val KafkaBroker = "**.**.**.**:9092";
    val InTopic = "avro";

    // Get Spark session
    val session = SparkSession
      .builder
      .master("local[*]")
      .appName("myapp")
      .getOrCreate()

    // Load streaming data
    import session.implicits._

    val data = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KafkaBroker)
      .option("subscribe", InTopic)
      .load()
      .select($"value".as[Array[Byte]])
      .map(d => {
        val rec = reader.read(null, decoder.binaryDecoder(d, null))
        val payload = rec.get("payload").asInstanceOf[Byte].toString
        new KafkaMessage(payload)
      })

    val query = data.writeStream
      .outputMode("Append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}


I am getting the below error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)

    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)

    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)

    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)

    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)

    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)

    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:99)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)


I read suggestions to use DataFileReader instead of binaryDecoder as below but was was not successful using this in scala.


DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);

DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);


Once the Byte type "payload" is converted to json, I plan write it back to another topic of kafka.

Any help on this is much appreciated. Thank you!

Revin




Re: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

Posted by Michael Armbrust <mi...@databricks.com>.
I believe that Avro/Kafka messages have a few bytes at the beginning of the
message to denote which schema is being used.  Have you tried using
the KafkaAvroDecoder inside of the map instead?

On Fri, May 12, 2017 at 9:26 AM, Revin Chalil <rc...@expedia.com> wrote:

> Just following up on this; would appreciate any responses on this. Thanks.
>
>
>
> *From:* Revin Chalil [mailto:rchalil@expedia.com]
> *Sent:* Wednesday, May 10, 2017 11:21 PM
> *To:* user@spark.apache.org
> *Subject:* Reading Avro messages from Kafka using Structured Streaming in
> Spark 2.1
>
>
>
> I am trying to convert avro records with field type = bytes to json string using Structured Streaming in Spark 2.1. Please see below.
>
>
>
> *object *AvroConvert {
>
>   *case class *KafkaMessage(
>                            payload: String
>                          )
>
>   *val **schemaString *=    """{
>                             "type" : "record",
>                             "name" : "HdfsEvent",
>                             "namespace" : "com.abc.def.domain.hdfs",
>                             "fields" : [ {
>                               "name" : "payload",
>                               "type" : {
>                                 "type" : "bytes",
>                                 "java-class" : "[B"
>                               }
>                             } ]
>                           }"""
>   *val **messageSchema *= *new *Schema.Parser().parse(*schemaString*)
>   *val **reader *= *new *GenericDatumReader[GenericRecord](*messageSchema*)
>   // Binary decoder
>   *val **decoder *= DecoderFactory.*get*()
>   // Register implicit encoder for map operation
>   *implicit val **encoder*: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.*kryo*[GenericRecord]
>
>   *def *main(args: Array[String]) {
>
>     *val *KafkaBroker = "**.**.**.**:9092";
>     *val *InTopic = "avro";
>
>     // Get Spark session
>     *val *session = SparkSession
>       .
> *builder      *.master("local[*]")
>       .appName("myapp")
>       .getOrCreate()
>
>     // Load streaming data
>     *import *session.implicits._
>
>     *val *data = session
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", KafkaBroker)
>       .option("subscribe", InTopic)
>       .load()
>       .select($"value".as[Array[Byte]])
>       .map(d => {
>         *val *rec = *reader*.read(*null*, *decoder*.binaryDecoder(d, *null*))
>         *val *payload = rec.get("payload").asInstanceOf[Byte].toString
>         *new *KafkaMessage(payload)
>       })
>
>     *val *query = data.writeStream
>       .outputMode("Append")
>       .format("console")
>       .start()
>
>     query.awaitTermination()
>   }
> }
>
>
>
>
>
> I am getting the below error.
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
>
>     at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>
>     at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>
>     at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>
>     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
>
>     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
>
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
>
>     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
>     at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
>
>     at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
>
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
>
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>
>     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>
>     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> I read suggestions to use DataFileReader instead of binaryDecoder as below
> but was was not successful using this in scala.
>
>
>
> DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
>
> DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);
>
>
>
>
>
> Once the Byte type “payload” is converted to json, I plan write it back to
> another topic of kafka.
>
>
>
> Any help on this is much appreciated. Thank you!
>
>
>
> Revin
>
>
>
>
>
>
>

RE: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

Posted by Revin Chalil <rc...@expedia.com>.
Just following up on this; would appreciate any responses on this. Thanks.

From: Revin Chalil [mailto:rchalil@expedia.com]
Sent: Wednesday, May 10, 2017 11:21 PM
To: user@spark.apache.org
Subject: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1


I am trying to convert avro records with field type = bytes to json string using Structured Streaming in Spark 2.1. Please see below.


object AvroConvert {

  case class KafkaMessage(
                           payload: String
                         )

  val schemaString =    """{
                            "type" : "record",
                            "name" : "HdfsEvent",
                            "namespace" : "com.abc.def.domain.hdfs",
                            "fields" : [ {
                              "name" : "payload",
                              "type" : {
                                "type" : "bytes",
                                "java-class" : "[B"
                              }
                            } ]
                          }"""
  val messageSchema = new Schema.Parser().parse(schemaString)
  val reader = new GenericDatumReader[GenericRecord](messageSchema)
  // Binary decoder
  val decoder = DecoderFactory.get()
  // Register implicit encoder for map operation
  implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

  def main(args: Array[String]) {

    val KafkaBroker = "**.**.**.**:9092";
    val InTopic = "avro";

    // Get Spark session
    val session = SparkSession
      .builder
      .master("local[*]")
      .appName("myapp")
      .getOrCreate()

    // Load streaming data
    import session.implicits._

    val data = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KafkaBroker)
      .option("subscribe", InTopic)
      .load()
      .select($"value".as[Array[Byte]])
      .map(d => {
        val rec = reader.read(null, decoder.binaryDecoder(d, null))
        val payload = rec.get("payload").asInstanceOf[Byte].toString
        new KafkaMessage(payload)
      })

    val query = data.writeStream
      .outputMode("Append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}


I am getting the below error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)

    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)

    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)

    at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)

    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)

    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)

    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:99)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)


I read suggestions to use DataFileReader instead of binaryDecoder as below but was was not successful using this in scala.


DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);

DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);


Once the Byte type "payload" is converted to json, I plan write it back to another topic of kafka.

Any help on this is much appreciated. Thank you!

Revin