You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alex Nastetsky <al...@verve.com> on 2020/05/12 21:28:39 UTC
to_avro/from_avro inserts extra values from Kafka
Hi all,
I create a dataframe, convert it to Avro with to_avro and write it to
Kafka.
Then I read it back out with from_avro.
(Not using Schema Registry.)
The problem is that the values skip every other field in the result.
I expect:
+---------+--------+-----+-------+
|firstName|lastName|color| mood|
+---------+--------+-----+-------+
| Suzy | Samson | indigo | grim |
| Jim | Johnson | blue | grimmer |
+---------+--------+-----+-------+
Instead I get:
+---------+--------+-----+-------+
|firstName|lastName|color| mood|
+---------+--------+-----+-------+
| | Suzy| | Samson|
| | Jim| |Johnson|
+---------+--------+-----+-------+
Here's what I'm doing --
$ kt admin -createtopic persons-avro-spark9 -topicdetail <(jsonify
=NumPartitions 1 =ReplicationFactor 1)
$ cat person.avsc
{
"type": "record",
"name": "Person",
"namespace": "com.ippontech.kafkatutorials",
"fields": [
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
},
{
"name": "color",
"type": "string"
},
{
"name": "mood",
"type": "string"
}
]
$ spark-shell --packages
org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.avro._
import java.nio.file.Files;
import java.nio.file.Paths;
val topic = "persons-avro-spark9"
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new
String(Files.readAllBytes(Paths.get("person.avsc")))
val personDF = sc.parallelize(Seq(
("Jim","Johnson","indigo","grim"),
("Suzy","Samson","blue","grimmer")
)).toDF("firstName","lastName","color","mood")
personDF.select(to_avro(struct(personDF.columns.map(column):_*)).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic",topic)
.option("avroSchema",jsonFormatSchema)
.save()
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.select(from_avro('value, jsonFormatSchema) as 'person)
.select($"person.firstName",$"person.lastName",$"person.color",$"person.mood")
.writeStream
.format("console")
.start()
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.avro._
import java.nio.file.Files
import java.nio.file.Paths
topic: String = persons-avro-spark9
jsonFormatSchema: String =
{
"type": "record",
"name": "Person",
"namespace": "com.ippontech.kafkatutorials",
"fields": [
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
},
{
"name": "color",
"type": "string"
},
{
"name": "mood",
"type": "string"
}
]
}
personDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName:
string ... 2 more fields]
stream: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3990c36c
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------+-----+-------+
|firstName|lastName|color| mood|
+---------+--------+-----+-------+
| | Suzy| | Samson|
| | Jim| |Johnson|
+---------+--------+-----+-------+
See the raw bytes:
$ kt consume -topic persons-avro-spark9
{
"partition": 0,
"offset": 0,
"key": null,
"value":
"\u0000\u0008Suzy\u0000\u000cSamson\u0000\u0008blue\u0000\u000egrimmer",
"timestamp": "2020-05-12T17:18:53.858-04:00"
}
{
"partition": 0,
"offset": 1,
"key": null,
"value":
"\u0000\u0006Jim\u0000\u000eJohnson\u0000\u000cindigo\u0000\u0008grim",
"timestamp": "2020-05-12T17:18:53.859-04:00"
}
Thanks,
Alex.