You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cassa L <lc...@gmail.com> on 2016/11/02 17:23:23 UTC
Custom receiver for WebSocket in Spark not working
Hi,
I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But
when I start my spark job, it connects to the WebSocket but doesn't get
any message. Same code, if I write as separate scala class, it works and
prints messages from WebSocket. Is anything missing in my Spark Code? There
are no errors in spark console.
Here is my receiver -
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
/**
* Custom receiver for WebSocket
*/
class WebSocketReceiver extends
Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging
{
private var webSocket: WebSocket = _
@transient
private var thread: Thread = _
override def onStart(): Unit = {
thread = new Thread(this)
thread.start()
}
override def onStop(): Unit = {
setWebSocket(null)
thread.interrupt()
}
override def run(): Unit = {
println("Received ----")
receive()
}
private def receive(): Unit = {
val connection = WebSocket().open("ws://localhost:3001")
println("WebSocket Connected ..." )
println("Connected ------- " + connection)
setWebSocket(connection)
connection.listener(new TextListener {
override def onMessage(message: String) {
System.out.println("Message in Spark client is --> " + message)
}
})
}
private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutDown
}
webSocket = newWebSocket
}
}
=====
Here is code for Spark job
object WebSocketTestApp {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Test Web Socket")
.setMaster("local[20]")
.set("test", "")
val ssc = new StreamingContext(conf, Seconds(5))
val stream: ReceiverInputDStream[String] = ssc.receiverStream(new
WebSocketReceiver())
stream.print()
ssc.start()
ssc.awaitTermination()
}
==============
}
Thanks,
LCassa
Re: Custom receiver for WebSocket in Spark not working
Posted by kant kodali <ka...@gmail.com>.
I don't see a store() call in your receive().
Search for store() in here http://spark.apache.org/
docs/latest/streaming-custom-receivers.html
On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lc...@gmail.com> wrote:
> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it connects to the WebSocket but doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
> * Custom receiver for WebSocket
> */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {
>
> private var webSocket: WebSocket = _
>
> @transient
> private var thread: Thread = _
>
> override def onStart(): Unit = {
> thread = new Thread(this)
> thread.start()
> }
>
> override def onStop(): Unit = {
> setWebSocket(null)
> thread.interrupt()
> }
>
> override def run(): Unit = {
> println("Received ----")
> receive()
> }
>
> private def receive(): Unit = {
>
>
> val connection = WebSocket().open("ws://localhost:3001")
> println("WebSocket Connected ..." )
> println("Connected ------- " + connection)
> setWebSocket(connection)
>
> connection.listener(new TextListener {
>
> override def onMessage(message: String) {
> System.out.println("Message in Spark client is --> " + message)
> }
> })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =====
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
> def main(args: Array[String]) {
> val conf = new SparkConf()
> .setAppName("Test Web Socket")
> .setMaster("local[20]")
> .set("test", "")
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
> val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver())
> stream.print()
>
> ssc.start()
> ssc.awaitTermination()
> }
>
>
> ==============
> }
>
>
> Thanks,
>
> LCassa
>
>
Re: Custom receiver for WebSocket in Spark not working
Posted by Kant Kodali <ka...@peernova.com>.
I don't see a store() call in your receive().
Search for store() in here
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lc...@gmail.com> wrote:
> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it connects to the WebSocket but doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
> * Custom receiver for WebSocket
> */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {
>
> private var webSocket: WebSocket = _
>
> @transient
> private var thread: Thread = _
>
> override def onStart(): Unit = {
> thread = new Thread(this)
> thread.start()
> }
>
> override def onStop(): Unit = {
> setWebSocket(null)
> thread.interrupt()
> }
>
> override def run(): Unit = {
> println("Received ----")
> receive()
> }
>
> private def receive(): Unit = {
>
>
> val connection = WebSocket().open("ws://localhost:3001")
> println("WebSocket Connected ..." )
> println("Connected ------- " + connection)
> setWebSocket(connection)
>
> connection.listener(new TextListener {
>
> override def onMessage(message: String) {
> System.out.println("Message in Spark client is --> " + message)
> }
> })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =====
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
> def main(args: Array[String]) {
> val conf = new SparkConf()
> .setAppName("Test Web Socket")
> .setMaster("local[20]")
> .set("test", "")
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
> val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver())
> stream.print()
>
> ssc.start()
> ssc.awaitTermination()
> }
>
>
> ==============
> }
>
>
> Thanks,
>
> LCassa
>
>