You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by OlegYch <ol...@gmail.com> on 2014/02/02 16:09:42 UTC

PageView streaming sample lost page views

Hi
I've tried running
org.apache.spark.streaming.examples.clickstream.PageViewStream (built from
master at https://github.com/apache/incubator-spark ) and i'm seeing only a
fraction of generated events being processed by stream.
E.g. if i start generator with 1000 events per second and add
textStream.saveAsTextFiles("/spark/teststream") then i only see like 100
events after a while and most of part files are empty, and pageCounts is
very low compared with what is produced by generator.
What might be the issue?

Aleh



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PageView streaming sample lost page views

Posted by dachuan <hd...@gmail.com>.
Thanks :)

I found the Flume, Kafka, Twitter, ZeroMQ examples confusing because they
need a third-party product which I have no clue about.

Please let me know if you have some experience in all of these stuff. And I
am currently interested in collecting all sorts of streaming apps for my
research, please feel free to discuss with me about those things :)


On Sun, Feb 2, 2014 at 7:59 PM, OlegYch <ol...@gmail.com> wrote:

> Thanks, i'll look into that.
> as for kafka, i've just used the simplest configuration, you can create it
> using their quickstart and code like this for consumer
>     val kafkaParams = Map[String, String](
>       "zookeeper.connect" -> "localhost:2181", "group.id" ->
> "test-consumer-group1",
>       "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" ->
> "smallest")
>
>     val textStream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, Map("test" -> 1),
> StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
>
> and like this for producer
>
>     val Array(brokers, topic, messagesPerSec, wordsPerMessage) =
> Array("localhost:9092", "test", "10", "10")
>
>     // Zookeper connection properties
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config = new ProducerConfig(props)
>     val producer = new Producer[String, String](config)
>
>     // Send some messages
>     while(true) {
>       producer.send(new KeyedMessage(topic, getNextClickEvent()))
>       Thread.sleep(10)
>     }
>
> (i took that from
> org.apache.spark.streaming.examples.clickstream.PageViewGenerator
> org.apache.spark.streaming.examples.clickstream.PageViewStream and
> org.apache.spark.streaming.examples.KafkaWordCount)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1149.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Re: PageView streaming sample lost page views

Posted by OlegYch <ol...@gmail.com>.
Thanks, i'll look into that.
as for kafka, i've just used the simplest configuration, you can create it
using their quickstart and code like this for consumer
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> "localhost:2181", "group.id" ->
"test-consumer-group1",
      "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" ->
"smallest")

    val textStream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, Map("test" -> 1),
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

and like this for producer

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) =
Array("localhost:9092", "test", "10", "10")

    // Zookeper connection properties
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)

    // Send some messages
    while(true) {
      producer.send(new KeyedMessage(topic, getNextClickEvent()))
      Thread.sleep(10)
    }

(i took that from
org.apache.spark.streaming.examples.clickstream.PageViewGenerator
org.apache.spark.streaming.examples.clickstream.PageViewStream and
org.apache.spark.streaming.examples.KafkaWordCount)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PageView streaming sample lost page views

Posted by dachuan <hd...@gmail.com>.
You might get help by reading StatefulNetworkWordCount workload and
StateDStream implementation.

By the way, could you please briefly introduce your kafka configuration,
for example, how do you find data source? I am new to kafka.


On Sun, Feb 2, 2014 at 3:26 PM, OlegYch <ol...@gmail.com> wrote:

> I've replaced socketStream with kafka and it seems to catch and store all
> messages now. So i guess it's a problem with either sample
> PageViewGenerator
> or socketTextStream.
> Anyway, i see that pageCounts only contains counts from last batch.  Is
> there a way to aggregate across all batches?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1143.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Re: PageView streaming sample lost page views

Posted by OlegYch <ol...@gmail.com>.
I've replaced socketStream with kafka and it seems to catch and store all
messages now. So i guess it's a problem with either sample PageViewGenerator
or socketTextStream.
Anyway, i see that pageCounts only contains counts from last batch.  Is
there a way to aggregate across all batches?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PageView streaming sample lost page views

Posted by dachuan <hd...@gmail.com>.
I have tried some workload that only the first partition will be computed
because the first partition suffice to provide the ten rows.

In your case, you have called saveAsTextFiles, all partitions should be
computed.

So I don't know the root cause yet. I just wanna remind you that some
workloads seem to be toy workloads that call DStream.print() at the end.


On Sun, Feb 2, 2014 at 10:18 AM, OlegYch <ol...@gmail.com> wrote:

> Do you mean the popularUsersSeen metric? Because otherwise all the other
> streams are supposed to operate on the whole stream, as far as i can see.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1128.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Re: PageView streaming sample lost page views

Posted by OlegYch <ol...@gmail.com>.
Do you mean the popularUsersSeen metric? Because otherwise all the other
streams are supposed to operate on the whole stream, as far as i can see.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126p1128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PageView streaming sample lost page views

Posted by dachuan <hd...@gmail.com>.
Is it because that this workload only print the first ten rows of the final
RDD?
On Feb 2, 2014 10:10 AM, "OlegYch" <ol...@gmail.com> wrote:

> Hi
> I've tried running
> org.apache.spark.streaming.examples.clickstream.PageViewStream (built from
> master at https://github.com/apache/incubator-spark ) and i'm seeing only
> a
> fraction of generated events being processed by stream.
> E.g. if i start generator with 1000 events per second and add
> textStream.saveAsTextFiles("/spark/teststream") then i only see like 100
> events after a while and most of part files are empty, and pageCounts is
> very low compared with what is produced by generator.
> What might be the issue?
>
> Aleh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PageView-streaming-sample-lost-page-views-tp1126.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>