You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cassa L <lc...@gmail.com> on 2016/11/02 17:23:23 UTC

Custom receiver for WebSocket in Spark not working

Hi,
I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But
when I start my spark job, it  connects to the WebSocket but  doesn't get
any message. Same code, if I write as separate scala class, it works and
prints messages from WebSocket. Is anything missing in my Spark Code? There
are no errors in spark console.

Here is my receiver -

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}

/**
  * Custom receiver for WebSocket
  */
class WebSocketReceiver extends
Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging
{

  private var webSocket: WebSocket = _

  @transient
  private var thread: Thread = _

  override def onStart(): Unit = {
    thread = new Thread(this)
    thread.start()
  }

  override def onStop(): Unit = {
    setWebSocket(null)
    thread.interrupt()
  }

  override def run(): Unit = {
    println("Received ----")
    receive()
  }

  private def receive(): Unit = {


    val connection = WebSocket().open("ws://localhost:3001")
    println("WebSocket  Connected ..." )
    println("Connected ------- " + connection)
    setWebSocket(connection)

   connection.listener(new TextListener {

         override def onMessage(message: String) {
                 System.out.println("Message in Spark client is --> " + message)
           }
    })


}

private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutDown
}
webSocket = newWebSocket
}

}


=====

Here is code for Spark job


object WebSocketTestApp {

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("Test Web Socket")
      .setMaster("local[20]")
      .set("test", "")
    val ssc = new StreamingContext(conf, Seconds(5))


    val stream: ReceiverInputDStream[String] = ssc.receiverStream(new
WebSocketReceiver())
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }


==============
}


Thanks,

LCassa

Re: Custom receiver for WebSocket in Spark not working

Posted by kant kodali <ka...@gmail.com>.
I don't see a store() call in your receive().

Search for store() in here http://spark.apache.org/
docs/latest/streaming-custom-receivers.html

On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lc...@gmail.com> wrote:

> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it  connects to the WebSocket but  doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
>   * Custom receiver for WebSocket
>   */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {
>
>   private var webSocket: WebSocket = _
>
>   @transient
>   private var thread: Thread = _
>
>   override def onStart(): Unit = {
>     thread = new Thread(this)
>     thread.start()
>   }
>
>   override def onStop(): Unit = {
>     setWebSocket(null)
>     thread.interrupt()
>   }
>
>   override def run(): Unit = {
>     println("Received ----")
>     receive()
>   }
>
>   private def receive(): Unit = {
>
>
>     val connection = WebSocket().open("ws://localhost:3001")
>     println("WebSocket  Connected ..." )
>     println("Connected ------- " + connection)
>     setWebSocket(connection)
>
>    connection.listener(new TextListener {
>
>          override def onMessage(message: String) {
>                  System.out.println("Message in Spark client is --> " + message)
>            }
>     })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =====
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Test Web Socket")
>       .setMaster("local[20]")
>       .set("test", "")
>     val ssc = new StreamingContext(conf, Seconds(5))
>
>
>     val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver())
>     stream.print()
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>
> ==============
> }
>
>
> Thanks,
>
> LCassa
>
>

Re: Custom receiver for WebSocket in Spark not working

Posted by Kant Kodali <ka...@peernova.com>.
I don't see a store() call in your receive().

Search for store() in here
http://spark.apache.org/docs/latest/streaming-custom-receivers.html

On Wed, Nov 2, 2016 at 10:23 AM, Cassa L <lc...@gmail.com> wrote:

> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it  connects to the WebSocket but  doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
>   * Custom receiver for WebSocket
>   */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {
>
>   private var webSocket: WebSocket = _
>
>   @transient
>   private var thread: Thread = _
>
>   override def onStart(): Unit = {
>     thread = new Thread(this)
>     thread.start()
>   }
>
>   override def onStop(): Unit = {
>     setWebSocket(null)
>     thread.interrupt()
>   }
>
>   override def run(): Unit = {
>     println("Received ----")
>     receive()
>   }
>
>   private def receive(): Unit = {
>
>
>     val connection = WebSocket().open("ws://localhost:3001")
>     println("WebSocket  Connected ..." )
>     println("Connected ------- " + connection)
>     setWebSocket(connection)
>
>    connection.listener(new TextListener {
>
>          override def onMessage(message: String) {
>                  System.out.println("Message in Spark client is --> " + message)
>            }
>     })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =====
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Test Web Socket")
>       .setMaster("local[20]")
>       .set("test", "")
>     val ssc = new StreamingContext(conf, Seconds(5))
>
>
>     val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver())
>     stream.print()
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>
> ==============
> }
>
>
> Thanks,
>
> LCassa
>
>