You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karthikjay <as...@gmail.com> on 2018/05/23 05:54:22 UTC
[Beginner][StructuredStreaming] Console sink is not working as
expected
I have the following code to read and process Kafka data using Structured
Streaming
object ETLTest {
case class record(value: String, topic: String)
def main(args: Array[String]): Unit = {
run();
}
def run(): Unit = {
val spark = SparkSession
.builder
.appName("Test JOB")
.master("local[*]")
.getOrCreate()
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "...")
.option("failOnDataLoss", "false")
.option("startingOffsets","earliest")
.load()
.selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")
val sdvWriter = new ForeachWriter[record] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: record) = {
println("record:: " + record)
}
def close(errorOrNull: Throwable): Unit = {}
}
val sdvDF = kafkaStreamingDF
.as[record]
.filter($"value".isNotNull)
// DOES NOT WORK
/*val query = sdvDF
.writeStream
.format("console")
.start()
.awaitTermination()*/
// WORKS
/*val query = sdvDF
.writeStream
.foreach(sdvWriter)
.start()
.awaitTermination()
*/
}
}
I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org