You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aaron Kimball <ak...@gmail.com> on 2014/03/01 05:46:17 UTC

error in streaming word count API?

Hi folks,

I was trying to work through the streaming word count example at
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland
couldn't get the code as-written to run. In fairness, I was trying to
do this inside the REPL rather than compiling a separate project; would the
types be different?

In any case, here's the code I ran:

$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))

// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.

// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
reduceByKey()

// Print a few of the counts to the console
scala> wordCount.print()   // ... and even if the above did work,
'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
as written.


Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> wordCounts.print()
scala> ssc.start()             // Start the computation
scala> ssc.awaitTermination()

This worked if I ran 'nc -lk 1234' in another terminal and typed some words
into it.. but the 'wordCounts.print()' statement would only emit things to
stdout if I sent a ^D into the netcat stream. It seems to print the output
for all 2-second windows all-at-once after the ^D in the network stream. Is
this an expected effect? I don't understand the semantics of ssc.start /
awaitTermination well enough to know how it interacts with the print
statement on wordCounts (which I think is a DStream of RRDs?)

I set spark.cleaner.ttl to a relatively high value (I'm not sure what units
those are.. seconds or millis) because a lower value caused stderr to spam
everywhere and make my terminal unreadable. Is that part of my issue? the
spark repl said I had to set it, so I just picked a number.

I kind of expected wordCounts.print() to be constantly emitting (word, N)
pairs to my spark terminal as I typed into the netcat side of things.

I'm using Spark built from github source that I pulled from source earlier
today.

I am using the following as my 'origin':
  Fetch URL: git://github.com/apache/incubator-spark.git

... and the most recent commit (master a.k.a. HEAD) is:
commit 4d880304867b55a4f2138617b30600b7fa013b14
Author: Bryn Keller <br...@intel.com>
Date:   Mon Feb 24 17:35:22 2014 -0800


In any case, I'm happy to help update the docs (or the code) if this is a
bug. I realize this is getting long-winded. But in any case, I think my
questions really boil down to:

1) should there be a reduceByKey() method on DStream? The documentation at
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays
so in the "Transformations" section, but the scaladoc at
https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't
list it.  DStream.scala also doesn't have a definition for such a
method...

(and based on reading the source of NetworkWordCount.scala, I can't
spot-identify why this *does* work there (i.e., reduceByKey compiles) but
it doesn't do so in the terminal)

2) Why do I have to wait for the stream to "terminate" with a ^D before
seeing any stdout in the repl from the wordCounts.print() statement?
 Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and
ssc.awaitTermination() ?

3) is the cleaner TTL something that, as a user, I should be adjusting to
change my observed effects? i.e., would adjusting this change the frequency
of emissions to stdout of prior window data?  Or is this just a background
property that happens to affect the spamminess of my stderr that is routed
to the same console?

4) Should I update the documentation to match my example (i.e., no
reduceByKey, but use words.countByValue() instead)?

5) Now that Spark is a TLP, are my references to the incubator-spark.git
and the http://spark.incubator.apache.org docs woefully out of date, making
this entire exercise a goof? :)

Thanks for the help!

Cheers,
- Aaron

Re: error in streaming word count API?

Posted by Aaron Kimball <ak...@gmail.com>.
Filed SPARK-1173 and sent a pull request. As an aside, I think this should
probably have been in the STREAMING project on the JIRA, but JIRA seemed
adamant that it only allow me to create new issues in the SPARK project.
Not sure if that's a JIRA permissions thing, or me losing a fight with
Atlassian UX ;) Please let me know if I should do something different next
time.

Cheers,
- Aaron


On Sun, Mar 2, 2014 at 10:28 PM, Aaron Kimball <ak...@gmail.com> wrote:

> Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in
> another, it demonstrates line-buffered behavior. It's a mystery!
>
> Thanks for the link on implicit conversions. The example makes sense.
>  Makes the code easier to trace too. I'll send a JIRA + pull req to touch
> up the docs.
>
> cheers,
> - Aaron
>
>
> On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> Hi Aaron,
>>
>> On Feb 28, 2014, at 8:46 PM, Aaron Kimball <ak...@gmail.com> wrote:
>>
>> > Hi folks,
>> >
>> > I was trying to work through the streaming word count example at
>> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland couldn't get the code as-written to run. In fairness, I was trying to
>> do this inside the REPL rather than compiling a separate project; would the
>> types be different?
>> >
>> > In any case, here's the code I ran:
>> >
>> > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>> >
>> > scala> import org.apache.spark.streaming._
>> > scala> val ssc = new StreamingContext(sc, Seconds(2))
>> > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
>> > scala> val words = lines.flatMap(_.split(" "))
>> >
>> > // *** The following code from the html page doesn't work
>> > // because pairs has type DStream[(String, Int)] and
>> > // there is no reduceByKey method on this type.
>>
>> This seems to be an oversight in the docs. You need to import
>> org.apache.spark.streaming.StreamingContext._ in order to get the pair
>> functions on DStreams of pairs (through a Scala implicit conversion).
>> reduceByKey is actually a function on something called
>> PairDStreamFunctions, and the implicit conversion above provides it for you
>> only if your DStream has key-value pairs. See
>> http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.htmlfor how this works.
>>
>>
>> > // Count each word in each batch
>> > scala> val pairs = words.map(word => (word, 1))
>> > scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
>> reduceByKey()
>> >
>> > // Print a few of the counts to the console
>> > scala> wordCount.print()   // ... and even if the above did work,
>> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
>> as written.
>>
>> Also looks like a bug in the docs.
>>
>> >
>> > Instead, I got the following to run instead:
>> > scala> val wordCounts = words.countByValue()
>> > scala> wordCounts.print()
>> > scala> ssc.start()             // Start the computation
>> > scala> ssc.awaitTermination()
>> >
>> > This worked if I ran 'nc -lk 1234' in another terminal and typed some
>> words into it.. but the 'wordCounts.print()' statement would only emit
>> things to stdout if I sent a ^D into the netcat stream. It seems to print
>> the output for all 2-second windows all-at-once after the ^D in the network
>> stream. Is this an expected effect? I don't understand the semantics of
>> ssc.start / awaitTermination well enough to know how it interacts with the
>> print statement on wordCounts (which I think is a DStream of RRDs?)
>>
>> It might also be that netcat didn't flush the stream right away when you
>> type input. Not 100% sure about that though. You could try to listen to it
>> using netcat on a different port and see if it does.
>>
>> >
>> > I set spark.cleaner.ttl to a relatively high value (I'm not sure what
>> units those are.. seconds or millis) because a lower value caused stderr to
>> spam everywhere and make my terminal unreadable. Is that part of my issue?
>> the spark repl said I had to set it, so I just picked a number.
>>
>> This shouldn't matter for this problem.
>>
>> > 5) Now that Spark is a TLP, are my references to the
>> incubator-spark.git and the http://spark.incubator.apache.org docs
>> woefully out of date, making this entire exercise a goof? :)
>>
>> If you find these, definitely feel free to fix them, though I believe
>> some recent pull requests fixed a few of them.
>>
>> Anyway, thanks for reporting this stuff!
>>
>> Matei
>>
>>
>

Re: error in streaming word count API?

Posted by Aaron Kimball <ak...@gmail.com>.
Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in
another, it demonstrates line-buffered behavior. It's a mystery!

Thanks for the link on implicit conversions. The example makes sense.
 Makes the code easier to trace too. I'll send a JIRA + pull req to touch
up the docs.

cheers,
- Aaron


On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi Aaron,
>
> On Feb 28, 2014, at 8:46 PM, Aaron Kimball <ak...@gmail.com> wrote:
>
> > Hi folks,
> >
> > I was trying to work through the streaming word count example at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland couldn't get the code as-written to run. In fairness, I was trying to
> do this inside the REPL rather than compiling a separate project; would the
> types be different?
> >
> > In any case, here's the code I ran:
> >
> > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
> >
> > scala> import org.apache.spark.streaming._
> > scala> val ssc = new StreamingContext(sc, Seconds(2))
> > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> > scala> val words = lines.flatMap(_.split(" "))
> >
> > // *** The following code from the html page doesn't work
> > // because pairs has type DStream[(String, Int)] and
> > // there is no reduceByKey method on this type.
>
> This seems to be an oversight in the docs. You need to import
> org.apache.spark.streaming.StreamingContext._ in order to get the pair
> functions on DStreams of pairs (through a Scala implicit conversion).
> reduceByKey is actually a function on something called
> PairDStreamFunctions, and the implicit conversion above provides it for you
> only if your DStream has key-value pairs. See
> http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.htmlfor how this works.
>
>
> > // Count each word in each batch
> > scala> val pairs = words.map(word => (word, 1))
> > scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
> reduceByKey()
> >
> > // Print a few of the counts to the console
> > scala> wordCount.print()   // ... and even if the above did work,
> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
> as written.
>
> Also looks like a bug in the docs.
>
> >
> > Instead, I got the following to run instead:
> > scala> val wordCounts = words.countByValue()
> > scala> wordCounts.print()
> > scala> ssc.start()             // Start the computation
> > scala> ssc.awaitTermination()
> >
> > This worked if I ran 'nc -lk 1234' in another terminal and typed some
> words into it.. but the 'wordCounts.print()' statement would only emit
> things to stdout if I sent a ^D into the netcat stream. It seems to print
> the output for all 2-second windows all-at-once after the ^D in the network
> stream. Is this an expected effect? I don't understand the semantics of
> ssc.start / awaitTermination well enough to know how it interacts with the
> print statement on wordCounts (which I think is a DStream of RRDs?)
>
> It might also be that netcat didn't flush the stream right away when you
> type input. Not 100% sure about that though. You could try to listen to it
> using netcat on a different port and see if it does.
>
> >
> > I set spark.cleaner.ttl to a relatively high value (I'm not sure what
> units those are.. seconds or millis) because a lower value caused stderr to
> spam everywhere and make my terminal unreadable. Is that part of my issue?
> the spark repl said I had to set it, so I just picked a number.
>
> This shouldn't matter for this problem.
>
> > 5) Now that Spark is a TLP, are my references to the incubator-spark.git
> and the http://spark.incubator.apache.org docs woefully out of date,
> making this entire exercise a goof? :)
>
> If you find these, definitely feel free to fix them, though I believe some
> recent pull requests fixed a few of them.
>
> Anyway, thanks for reporting this stuff!
>
> Matei
>
>

Re: error in streaming word count API?

Posted by Matei Zaharia <ma...@gmail.com>.
Hi Aaron,

On Feb 28, 2014, at 8:46 PM, Aaron Kimball <ak...@gmail.com> wrote:

> Hi folks,
> 
> I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html and couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
> 
> In any case, here's the code I ran:
> 
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
> 
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
> 
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.

This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html for how this works.


> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no reduceByKey()
> 
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written.

Also looks like a bug in the docs.

> 
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
> 
> This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?)  

It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does.

> 
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number.

This shouldn’t matter for this problem.

> 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :)

If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them.

Anyway, thanks for reporting this stuff!

Matei


Re: error in streaming word count API?

Posted by Aaron Kimball <ak...@gmail.com>.
As a post-script, when running the example in precompiled form:

/bin/run-example org.apache.spark.streaming.examples.NetworkWordCount
local[2] localhost 9999


... I don't need to send a ^D to the netcat stream. It does print the
batches to stdout in the manner I'd expect. So is this more repl weirdness
than spark weirdness?

- Aaron


On Fri, Feb 28, 2014 at 8:46 PM, Aaron Kimball <ak...@gmail.com> wrote:

> Hi folks,
>
> I was trying to work through the streaming word count example at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland couldn't get the code as-written to run. In fairness, I was trying to
> do this inside the REPL rather than compiling a separate project; would the
> types be different?
>
> In any case, here's the code I ran:
>
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
>
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.
>
> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
> reduceByKey()
>
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work,
> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
> as written.
>
>
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
>
> This worked if I ran 'nc -lk 1234' in another terminal and typed some
> words into it.. but the 'wordCounts.print()' statement would only emit
> things to stdout if I sent a ^D into the netcat stream. It seems to print
> the output for all 2-second windows all-at-once after the ^D in the network
> stream. Is this an expected effect? I don't understand the semantics of
> ssc.start / awaitTermination well enough to know how it interacts with the
> print statement on wordCounts (which I think is a DStream of RRDs?)
>
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what
> units those are.. seconds or millis) because a lower value caused stderr to
> spam everywhere and make my terminal unreadable. Is that part of my issue?
> the spark repl said I had to set it, so I just picked a number.
>
> I kind of expected wordCounts.print() to be constantly emitting (word, N)
> pairs to my spark terminal as I typed into the netcat side of things.
>
> I'm using Spark built from github source that I pulled from source earlier
> today.
>
> I am using the following as my 'origin':
>   Fetch URL: git://github.com/apache/incubator-spark.git
>
> ... and the most recent commit (master a.k.a. HEAD) is:
> commit 4d880304867b55a4f2138617b30600b7fa013b14
> Author: Bryn Keller <br...@intel.com>
> Date:   Mon Feb 24 17:35:22 2014 -0800
>
>
> In any case, I'm happy to help update the docs (or the code) if this is a
> bug. I realize this is getting long-winded. But in any case, I think my
> questions really boil down to:
>
> 1) should there be a reduceByKey() method on DStream? The documentation at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays so in the "Transformations" section, but the scaladoc at
> https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't list it.  DStream.scala also doesn't have a definition for such a
> method...
>
> (and based on reading the source of NetworkWordCount.scala, I can't
> spot-identify why this *does* work there (i.e., reduceByKey compiles) but
> it doesn't do so in the terminal)
>
> 2) Why do I have to wait for the stream to "terminate" with a ^D before
> seeing any stdout in the repl from the wordCounts.print() statement?
>  Doesn't this defeat the point of "streaming"?
> 2a) how does the print() statement interact with ssc.start() and
> ssc.awaitTermination() ?
>
> 3) is the cleaner TTL something that, as a user, I should be adjusting to
> change my observed effects? i.e., would adjusting this change the frequency
> of emissions to stdout of prior window data?  Or is this just a background
> property that happens to affect the spamminess of my stderr that is routed
> to the same console?
>
> 4) Should I update the documentation to match my example (i.e., no
> reduceByKey, but use words.countByValue() instead)?
>
> 5) Now that Spark is a TLP, are my references to the incubator-spark.git
> and the http://spark.incubator.apache.org docs woefully out of date,
> making this entire exercise a goof? :)
>
> Thanks for the help!
>
> Cheers,
> - Aaron
>
>