You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Thakrar, Jayesh" <jt...@conversantmedia.com> on 2018/02/21 03:27:09 UTC

Spark Streaming Custom Receiver Anomaly

Hi All,

I am trying to "test" a very simple custom receiver and am a little puzzled.

Using Spark 2.2.0 shell on my laptop, I am running the code below.
I was expecting the code to timeout since my timeout wait period is 1 ms and I have a sleep in the class that is much more (1200 ms).

Is this normal? Or am I interpreting something incorrectly?

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming._

class CustomReceiver extends org.apache.spark.streaming.receiver.Receiver[String](org.apache.spark.storage.StorageLevel.MEMORY_ONLY) {
  def onStart() {
    new Thread("CustomReceiver") {
      override def run() { receive() }
    }.start()
  }
  def onStop() {}
  private def receive() {
    val hostname = java.net.InetAddress.getLocalHost()
    val time = java.util.Calendar.getInstance.getTime
    var counter = 0
    while (isStarted && !isStopped) {
      counter += 1
      store(s"host = ${hostname} time = ${time} counter = ${counter}")
      Thread.sleep(1200)
    }
  }
}

val ssc = new StreamingContext(sc, Seconds(1))
val words = ssc.receiverStream(new CustomReceiver())

words.print()
ssc.start()
ssc.awaitTerminationOrTimeout(1)