You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by karthikjay <> on 2018/05/23 05:54:22 UTC

[Beginner][StructuredStreaming] Console sink is not working as expected

I have the following code to read and process Kafka data using Structured

	object ETLTest {

	  case class record(value: String, topic: String)

	  def main(args: Array[String]): Unit = {

	  def run(): Unit = {

	    val spark = SparkSession
	      .appName("Test JOB")

	    val kafkaStreamingDF = spark
	      .option("kafka.bootstrap.servers", "...")
	      .option("subscribe", "...")
	      .option("failOnDataLoss", "false")
	      .selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")

	    val sdvWriter = new ForeachWriter[record] {
	      def open(partitionId: Long, version: Long): Boolean = {
	      def process(record: record) = {
	        println("record:: " + record)
	      def close(errorOrNull: Throwable): Unit = {}

	    val sdvDF = kafkaStreamingDF

        // DOES NOT WORK
	    /*val query = sdvDF

        // WORKS
	    /*val query = sdvDF



I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?

Sent from:

To unsubscribe e-mail: