You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Thakrar, Jayesh" <jt...@conversantmedia.com> on 2018/02/21 03:27:09 UTC
Spark Streaming Custom Receiver Anomaly
Hi All,
I am trying to "test" a very simple custom receiver and am a little puzzled.
Using Spark 2.2.0 shell on my laptop, I am running the code below.
I was expecting the code to timeout since my timeout wait period is 1 ms and I have a sleep in the class that is much more (1200 ms).
Is this normal? Or am I interpreting something incorrectly?
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming._
class CustomReceiver extends org.apache.spark.streaming.receiver.Receiver[String](org.apache.spark.storage.StorageLevel.MEMORY_ONLY) {
def onStart() {
new Thread("CustomReceiver") {
override def run() { receive() }
}.start()
}
def onStop() {}
private def receive() {
val hostname = java.net.InetAddress.getLocalHost()
val time = java.util.Calendar.getInstance.getTime
var counter = 0
while (isStarted && !isStopped) {
counter += 1
store(s"host = ${hostname} time = ${time} counter = ${counter}")
Thread.sleep(1200)
}
}
}
val ssc = new StreamingContext(sc, Seconds(1))
val words = ssc.receiverStream(new CustomReceiver())
words.print()
ssc.start()
ssc.awaitTerminationOrTimeout(1)