You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karthikjay <as...@gmail.com> on 2018/05/23 05:54:22 UTC

[Beginner][StructuredStreaming] Console sink is not working as expected

I have the following code to read and process Kafka data using Structured
Streaming 

  
	object ETLTest {

	  case class record(value: String, topic: String)

	  def main(args: Array[String]): Unit = {
	    run();
	  }

	  def run(): Unit = {

	    val spark = SparkSession
	      .builder
	      .appName("Test JOB")
	      .master("local[*]")
	      .getOrCreate()

	    val kafkaStreamingDF = spark
	      .readStream
	      .format("kafka")
	      .option("kafka.bootstrap.servers", "...")
	      .option("subscribe", "...")
	      .option("failOnDataLoss", "false")
	      .option("startingOffsets","earliest")
	      .load()
	      .selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")

	    val sdvWriter = new ForeachWriter[record] {
	      def open(partitionId: Long, version: Long): Boolean = {
	        true
	      }
	      def process(record: record) = {
	        println("record:: " + record)
	      }
	      def close(errorOrNull: Throwable): Unit = {}
	    }

	    val sdvDF = kafkaStreamingDF
	      .as[record]
	      .filter($"value".isNotNull)

        // DOES NOT WORK
	    /*val query = sdvDF
	        .writeStream
	        .format("console")
	        .start()
	        .awaitTermination()*/

        // WORKS
	    /*val query = sdvDF
	      .writeStream
	      .foreach(sdvWriter)
	      .start()
	      .awaitTermination()
	      */

	  }

	}

I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org