You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Cassa L <lc...@gmail.com> on 2017/07/21 22:09:46 UTC
Fwd: Spark Structured Streaming - Spark Consumer does not display messages
Hi,
This is first time I am trying structured streaming with Kafka. I have
simple code to read from Kafka and display it on the console. Message is in
JSON format. However, when I run my code nothin after below line gets
printed.
17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query.
17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0.
17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) dead for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0
Code is -
Dataset<Row> kafkaStream = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",
config.getString("kafka.host") + ":" + config.getString("kafka.port"))
.option("subscribe", "test")
.load();
//kafkaStream.printSchema();
//JSON ::: {"id":1,"name":"MySelf"}
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name",
DataTypes.StringType, false)});
Dataset<KafkaMessage> streamingSelectDF =
kafkaStream.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),
schema).as("json"))
.select("json.*")
.as(Encoders.bean(KafkaMessage.class));
streamingSelectDF.createOrReplaceTempView("MyView");
Dataset<Row> streamData = spark.sql("SELECT count(*) from MyView");
StreamingQuery streamingQuery = streamData.writeStream()
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime("10 seconds")).start();
try {
streamingQuery.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
Regards,
Leena