You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dominik Safaric <do...@gmail.com> on 2016/02/25 17:27:01 UTC

Spark Streaming - processing/transforming DStreams using a custom Receiver

Recently, I've implemented the following Receiver and custom Spark Streaming
InputDStream using Scala:

/**
 * The GitHubUtils object declares an interface consisting of overloaded
createStream
 * functions. The createStream function takes as arguments the ctx :
StreamingContext
 * passed by the driver program, along with the storageLevel : StorageLevel,
returning
 * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream
representation,
 * i.e. a derivation of the abstract class ReceivedInputDStream.
*/

object GitHubUtils{

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel)

}

/**
 * The GitHubInputDStream class takes as constructor arguments a ctx :
StreamingContext,
 * and a storageLevel : StorageLevel. The class inherits from the
ReceiverInputDStream
 * abstract class declared within SparkStreaming. In summary, the
GitHubInputDStream
 * is a DStream representation of GitHub events, implementing i.e.
overriding the
 * getReceiver() function that returns a Receiver[Event] object.
*/

private[streaming]
class GitHubInputDStream(ctx : StreamingContext, storageLevel: StorageLevel)
extends ReceiverInputDStream[Event](ctx) {

 def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel,
Client)

}

/**
 * The GitHubReceiver class takes as a constructor argument a storageLevel :
StorageLevel.
 * It implements i.e. overrides two functions declared by the Receiver
interface, notably
 * onStart() and onStop(). As the names imply, the onStart() function is
executed
 * when creating DStreams, i.e. within a specified batch interval. However,
the onStart().
*/

private[streaming]
class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient)
extends Receiver[Event](storageLevel) with Logging {

  def onStart(): Unit = {
    consumeEvents(new EventService(client).pagePublicEvents(0,300))
}

 def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext
match{
    case true => iterator.next.toList.foreach{event => store(event)};
consumeEvents(iterator)
    case false => logInfo("Processing is stopping")
}

def onStop(): Unit = {

}

However, then initialised i.e. created in the driver program on my local
machine and applied a series of functions like e.g. flatMap on a
DStream[Event]:

val configuration = new
SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
val streamingContext = new StreamingContext(configuration, Seconds(5))

val stream = GitHubUtils.createStream(streamingContext,
StorageLevel.MEMORY_AND_DISK_SER)

val timestamps = stream.flatMap(event => event.getCreatedAt.toString)

and then applied a series of functions such as reduceByKey that would allow
me to count e.g. the number of events per second, I get the following
output:

(T,100)
(d,100)
(4,100)
(8,13)
(6,114)
(0,366)

While the output should be in the form of e.g.:

(2016-26-02 00:00:01,100)
(2016-26-02 00:00:02,100)
(2016-26-02 00:00:03,100)
(2016-26-02 00:00:04,13)
(2016-26-02 00:00:05,114)
(2016-26-02 00:00:06,366)

where K = Char. The root of the problem is that when flatMap is applied to
an event that is a serialisable object containing a member variable
getCreatedAt : Date, rather then producing a DStream[String] it produces a
DStream[Char] - meaning that Spark somehow splits the date String using some
delimiter.

I've also tried to collect and perform the computation on timestamps using
first foreachRDD on the DStream of events, and then using collect to get the
full String representation of the date - and then it works. However, since
collect can be quite expensive, I am simply trying to avoid it and hence
think that there must be a better solution to this.

Therefore, my questions are: how exactly do a create from the DStream[Event]
a DStream[String] (instead of DStream[Char]), where each string in the
DStream represents a timestamp from a RDD? Secondly, can someone give some
good examples of this? And thirdly, which functions is at best to use if I
would like to e.g. aggregate all events per repository ID. I.e. each Event
object contains a getRepository() function that returns the ID : Long of the
GitHub repository, and then on each streamed event belonging to a certain
repository, I would like to map it to its corresponding repository ID in the
form of (Long, [Event]).

Thanks in advance!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming - processing/transforming DStreams using a custom Receiver

Posted by Bryan Cutler <cu...@gmail.com>.
Using flatmap on a string will treat it as a sequence, which is why you are
getting an RDD of char.  I think you want to just do a map instead.  Like
this

val timestamps = stream.map(event => event.getCreatedAt.toString)
On Feb 25, 2016 8:27 AM, "Dominik Safaric" <do...@gmail.com> wrote:

> Recently, I've implemented the following Receiver and custom Spark
> Streaming
> InputDStream using Scala:
>
> /**
>  * The GitHubUtils object declares an interface consisting of overloaded
> createStream
>  * functions. The createStream function takes as arguments the ctx :
> StreamingContext
>  * passed by the driver program, along with the storageLevel :
> StorageLevel,
> returning
>  * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream
> representation,
>  * i.e. a derivation of the abstract class ReceivedInputDStream.
> */
>
> object GitHubUtils{
>
>   def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
> ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel)
>
> }
>
> /**
>  * The GitHubInputDStream class takes as constructor arguments a ctx :
> StreamingContext,
>  * and a storageLevel : StorageLevel. The class inherits from the
> ReceiverInputDStream
>  * abstract class declared within SparkStreaming. In summary, the
> GitHubInputDStream
>  * is a DStream representation of GitHub events, implementing i.e.
> overriding the
>  * getReceiver() function that returns a Receiver[Event] object.
> */
>
> private[streaming]
> class GitHubInputDStream(ctx : StreamingContext, storageLevel:
> StorageLevel)
> extends ReceiverInputDStream[Event](ctx) {
>
>  def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel,
> Client)
>
> }
>
> /**
>  * The GitHubReceiver class takes as a constructor argument a storageLevel
> :
> StorageLevel.
>  * It implements i.e. overrides two functions declared by the Receiver
> interface, notably
>  * onStart() and onStop(). As the names imply, the onStart() function is
> executed
>  * when creating DStreams, i.e. within a specified batch interval. However,
> the onStart().
> */
>
> private[streaming]
> class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient)
> extends Receiver[Event](storageLevel) with Logging {
>
>   def onStart(): Unit = {
>     consumeEvents(new EventService(client).pagePublicEvents(0,300))
> }
>
>  def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext
> match{
>     case true => iterator.next.toList.foreach{event => store(event)};
> consumeEvents(iterator)
>     case false => logInfo("Processing is stopping")
> }
>
> def onStop(): Unit = {
>
> }
>
> However, then initialised i.e. created in the driver program on my local
> machine and applied a series of functions like e.g. flatMap on a
> DStream[Event]:
>
> val configuration = new
> SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
> val streamingContext = new StreamingContext(configuration, Seconds(5))
>
> val stream = GitHubUtils.createStream(streamingContext,
> StorageLevel.MEMORY_AND_DISK_SER)
>
> val timestamps = stream.flatMap(event => event.getCreatedAt.toString)
>
> and then applied a series of functions such as reduceByKey that would allow
> me to count e.g. the number of events per second, I get the following
> output:
>
> (T,100)
> (d,100)
> (4,100)
> (8,13)
> (6,114)
> (0,366)
>
> While the output should be in the form of e.g.:
>
> (2016-26-02 00:00:01,100)
> (2016-26-02 00:00:02,100)
> (2016-26-02 00:00:03,100)
> (2016-26-02 00:00:04,13)
> (2016-26-02 00:00:05,114)
> (2016-26-02 00:00:06,366)
>
> where K = Char. The root of the problem is that when flatMap is applied to
> an event that is a serialisable object containing a member variable
> getCreatedAt : Date, rather then producing a DStream[String] it produces a
> DStream[Char] - meaning that Spark somehow splits the date String using
> some
> delimiter.
>
> I've also tried to collect and perform the computation on timestamps using
> first foreachRDD on the DStream of events, and then using collect to get
> the
> full String representation of the date - and then it works. However, since
> collect can be quite expensive, I am simply trying to avoid it and hence
> think that there must be a better solution to this.
>
> Therefore, my questions are: how exactly do a create from the
> DStream[Event]
> a DStream[String] (instead of DStream[Char]), where each string in the
> DStream represents a timestamp from a RDD? Secondly, can someone give some
> good examples of this? And thirdly, which functions is at best to use if I
> would like to e.g. aggregate all events per repository ID. I.e. each Event
> object contains a getRepository() function that returns the ID : Long of
> the
> GitHub repository, and then on each streamed event belonging to a certain
> repository, I would like to map it to its corresponding repository ID in
> the
> form of (Long, [Event]).
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>