You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Diana Carroll <dc...@cloudera.com> on 2014/03/27 16:02:58 UTC

spark streaming and the spark shell

I'm working with spark streaming using spark-shell, and hoping folks could
answer a few questions I have.

I'm doing WordCount on a socket stream:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",4444)
var words = mystream.flatMap(line => line.split(" "))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()



1.  I'm assuming that using spark shell is an edge case, and that spark
streaming is really intended mostly for batch use.  True?

2.   I notice that once I start ssc.start(), my stream starts processing
and continues indefinitely...even if I close the socket on the server end
(I'm using unix command "nc" to mimic a server as explained in the
streaming programming guide .)  Can I tell my stream to detect if it's lost
a connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)

3.  I tried entering ssc.stop which resulted in an error:

Exception in thread "Thread-43" org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found

But it did stop the DStream execution.

4.  Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?

5.  When I perform an operation like wordCounts.print(), that operation
will execution on each batch, ever n seconds.  Is there a way I can undo
that operation?  That is, I want it to *stop* executing that print ever n
seconds...without having to stop the stream.

What I'm really asking is...can I explore DStreams interactively the way I
can explore my data in regular Spark.  In regular Spark, I might perform
various operations on an RDD to see what happens.  So at first, I might
have used "split(" ") to tokenize my input text, but now I want to try
using split(",") instead, after the stream has already started running.
 Can I do that?

I did find out that if add a new operation to an existing dstream (say,
words.print()) *after *the ssc.start it works. It *will* add the second
print() call to the execution list every n seconds.

but if I try to add new dstreams, e.g.
...

ssc.start()

var testpairs = words.map(x => (x, "TEST"))
testpairs.print()


I get an error:

14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
1395932270000 ms
java.lang.Exception:
org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
initialized


Is this sort of interactive use just not supported?

Thanks!

Diana

Re: spark streaming and the spark shell

Posted by Diana Carroll <dc...@cloudera.com>.
Thanks, Tagatha.  This and your other reply on awaitTermination are very
helpful.

Diana


On Thu, Mar 27, 2014 at 4:40 PM, Tathagata Das
<ta...@gmail.com>wrote:

> Very good questions! Responses inline.
>
> TD
>
> On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <dc...@cloudera.com>
> wrote:
> > I'm working with spark streaming using spark-shell, and hoping folks
> could
> > answer a few questions I have.
> >
> > I'm doing WordCount on a socket stream:
> >
> > import org.apache.spark.streaming.StreamingContext
> > import org.apache.spark.streaming.StreamingContext._
> > import org.apache.spark.streaming.Seconds
> > var ssc = new StreamingContext(sc,Seconds(5))
> > var mystream = ssc.socketTextStream("localhost",4444)
> > var words = mystream.flatMap(line => line.split(" "))
> > var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> > wordCounts.print()
> > ssc.start()
> >
> >
> >
> > 1.  I'm assuming that using spark shell is an edge case, and that spark
> > streaming is really intended mostly for batch use.  True?
> >
>
> Yes. Currently the spark-shell is not the intended execution mode for
> Spark Streaming, even though it can be done for quick testing.
>
> > 2.   I notice that once I start ssc.start(), my stream starts processing
> and
> > continues indefinitely...even if I close the socket on the server end
> (I'm
> > using unix command "nc" to mimic a server as explained in the streaming
> > programming guide .)  Can I tell my stream to detect if it's lost a
> > connection and therefore stop executing?  (Or even better, to attempt to
> > re-establish the connection?)
> >
>
>
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.
>
> > 3.  I tried entering ssc.stop which resulted in an error:
> >
> > Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> > cancelled because SparkContext was shut down
> > 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> > SendingConnectionManagerId not found
> >
> > But it did stop the DStream execution.
> >
>
>
> Ah, that happens sometimes. The existing behavior of ssc.stop() is
> that it will stop everything immediately.
> I just opened a pull request for a more graceful shutting down of the
> Spark streaming program.
> https://github.com/apache/spark/pull/247
>
> > 4.  Then I tried restarting the ssc again (ssc.start) and got another
> error:
> > org.apache.spark.SparkException: JobScheduler already started
> > Is restarting an ssc supported?
> >
>
>
> Restarting is ideally not supported. However, the behavior was not
> explicitly checked. The above pull requests
> makes the behavior more explicitly by throwing the right warnings and
> exceptions.
>
> > 5.  When I perform an operation like wordCounts.print(), that operation
> will
> > execution on each batch, ever n seconds.  Is there a way I can undo that
> > operation?  That is, I want it to *stop* executing that print ever n
> > seconds...without having to stop the stream.
> >
> > What I'm really asking is...can I explore DStreams interactively the way
> I
> > can explore my data in regular Spark.  In regular Spark, I might perform
> > various operations on an RDD to see what happens.  So at first, I might
> have
> > used "split(" ") to tokenize my input text, but now I want to try using
> > split(",") instead, after the stream has already started running.  Can I
> do
> > that?
> >
> > I did find out that if add a new operation to an existing dstream (say,
> > words.print()) after the ssc.start it works. It *will* add the second
> > print() call to the execution list every n seconds.
> >
> > but if I try to add new dstreams, e.g.
> > ...
> >
> > ssc.start()
> >
> > var testpairs = words.map(x => (x, "TEST"))
> > testpairs.print()
> >
> >
> > I get an error:
> >
> > 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> > 1395932270000 ms
> > java.lang.Exception:
> > org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> > initialized
> >
> >
> > Is this sort of interactive use just not supported?
>
>
> Modifying the DStream operations after the context has started is not
> officially supported. However dynamically changing the computation can
> be done using DStream.transform() or DStream.foreachRDD()
> Both these operations allow you to do arbitrary RDD operations on each
> RDD. So you can dynamically modify what RDD operations are used within
> the DStream transform / foreachRDD (so you are not changing the
> DStream operations, only whats inside the DStream operation). But to
> use this really interactively, you have to write a bit of additional
> code that allows the user to interactively specify the function
> applied on each RDD.
>
>
>
> >
> > Thanks!
> >
> > Diana
>

Re: spark streaming and the spark shell

Posted by Evgeny Shishkin <it...@gmail.com>.
On 28 Mar 2014, at 01:37, Tathagata Das <ta...@gmail.com> wrote:

> I see! As I said in the other thread, no one reported these issues until now! A good and not-too-hard fix is to add the functionality of the limiting the data rate that the receivers receives at. I have opened a JIRA. 
> 

Yes, actually you should have another Jira on this
https://github.com/apache/incubator-spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L106

This just erases offsets from zookeeper. But auto.offsets.reset have another meaning.

What to do when there is no initial offset in Zookeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

i will stress it — WHEN THERE IS NO INITIAL OFFSET OF IT IS OUT OF RANGE
not “hey! i’ll just reset your position because you restarted app"

> TD
> 
> 
> On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin <it...@gmail.com> wrote:
> 
> On 28 Mar 2014, at 01:13, Tathagata Das <ta...@gmail.com> wrote:
> 
>> Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.
> 
> 
> Yes, it is easy to start with latest offsets, get steady configuration and everything is nice.
> 
> Then your machine failes. And you stop receiving from kafka anything.
> 
> Then you notice this and restart your app hoping it would continue from offsets on zookeeper.
> BUT NO
> YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER
> 
> After we fixed erasing offsets, we start from Some Offsets in the past.
> And during batch duration we can’t limit how many messages we get from Kafka.
> AND HERE WE OOM
> 
> And it's just a pain. Complete pain.
> 
> And you remember, only some machines consumes. Usually two or three. Because of broken high-level consumer in kafka.
> 


Re: spark streaming and the spark shell

Posted by Tathagata Das <ta...@gmail.com>.
I see! As I said in the other thread, no one reported these issues until
now! A good and not-too-hard fix is to add the functionality of the
limiting the data rate that the receivers receives at. I have opened a
JIRA.

TD


On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin <it...@gmail.com>wrote:

>
> On 28 Mar 2014, at 01:13, Tathagata Das <ta...@gmail.com>
> wrote:
>
> Seems like the configuration of the Spark worker is not right. Either the
> worker has not been given enough memory or the allocation of the memory to
> the RDD storage needs to be fixed. If configured correctly, the Spark
> workers should not get OOMs.
>
>
>
> Yes, it is easy to start with latest offsets, get steady configuration and
> everything is nice.
>
> Then your machine failes. And you stop receiving from kafka anything.
>
> Then you notice this and restart your app hoping it would continue from
> offsets on zookeeper.
> BUT NO
> YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER
>
> After we fixed erasing offsets, we start from Some Offsets in the past.
> And during batch duration we can't limit how many messages we get from
> Kafka.
> AND HERE WE OOM
>
> And it's just a pain. Complete pain.
>
> And you remember, only some machines consumes. Usually two or three.
> Because of broken high-level consumer in kafka.
>

Re: spark streaming and the spark shell

Posted by Evgeny Shishkin <it...@gmail.com>.
On 28 Mar 2014, at 01:13, Tathagata Das <ta...@gmail.com> wrote:

> Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs.


Yes, it is easy to start with latest offsets, get steady configuration and everything is nice.

Then your machine failes. And you stop receiving from kafka anything.

Then you notice this and restart your app hoping it would continue from offsets on zookeeper.
BUT NO
YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER

After we fixed erasing offsets, we start from Some Offsets in the past.
And during batch duration we can’t limit how many messages we get from Kafka.
AND HERE WE OOM

And it's just a pain. Complete pain.

And you remember, only some machines consumes. Usually two or three. Because of broken high-level consumer in kafka.

Re: spark streaming and the spark shell

Posted by Tathagata Das <ta...@gmail.com>.
Seems like the configuration of the Spark worker is not right. Either the
worker has not been given enough memory or the allocation of the memory to
the RDD storage needs to be fixed. If configured correctly, the Spark
workers should not get OOMs.



On Thu, Mar 27, 2014 at 2:52 PM, Evgeny Shishkin <it...@gmail.com>wrote:

>
> 2.   I notice that once I start ssc.start(), my stream starts processing
> and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>
>
>
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.
>
>
> Now i understand why out spark streaming job starts to generate zero sized
> rdds from kafkainput,
> when one worker get OOM or crashes.
>
> And we can't detect it! Great. So spark streaming just doesn't suite yet
> for 24/7 operation =\
>

Re: spark streaming and the spark shell

Posted by Tian Zhang <tz...@yahoo.com>.
I am hitting the same issue, i.e., after running for some time, if spark
streaming job lost or timeout 
kafka connection, it will just start to return empty RDD's ..
Is there a timeline for when this issue will be fixed so that I can plan
accordingly?

Thanks.

Tian



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p19296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: spark streaming and the spark shell

Posted by Evgeny Shishkin <it...@gmail.com>.
> 
>> 2.   I notice that once I start ssc.start(), my stream starts processing and
>> continues indefinitely...even if I close the socket on the server end (I'm
>> using unix command "nc" to mimic a server as explained in the streaming
>> programming guide .)  Can I tell my stream to detect if it's lost a
>> connection and therefore stop executing?  (Or even better, to attempt to
>> re-establish the connection?)
>> 
> 
> 
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.

Now i understand why out spark streaming job starts to generate zero sized rdds from kafkainput, 
when one worker get OOM or crashes.

And we can’t detect it! Great. So spark streaming just doesn’t suite yet for 24/7 operation =\

Re: spark streaming and the spark shell

Posted by Tathagata Das <ta...@gmail.com>.
Very good questions! Responses inline.

TD

On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <dc...@cloudera.com> wrote:
> I'm working with spark streaming using spark-shell, and hoping folks could
> answer a few questions I have.
>
> I'm doing WordCount on a socket stream:
>
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.Seconds
> var ssc = new StreamingContext(sc,Seconds(5))
> var mystream = ssc.socketTextStream("localhost",4444)
> var words = mystream.flatMap(line => line.split(" "))
> var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> wordCounts.print()
> ssc.start()
>
>
>
> 1.  I'm assuming that using spark shell is an edge case, and that spark
> streaming is really intended mostly for batch use.  True?
>

Yes. Currently the spark-shell is not the intended execution mode for
Spark Streaming, even though it can be done for quick testing.

> 2.   I notice that once I start ssc.start(), my stream starts processing and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>


Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

> 3.  I tried entering ssc.stop which resulted in an error:
>
> Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> cancelled because SparkContext was shut down
> 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> SendingConnectionManagerId not found
>
> But it did stop the DStream execution.
>


Ah, that happens sometimes. The existing behavior of ssc.stop() is
that it will stop everything immediately.
I just opened a pull request for a more graceful shutting down of the
Spark streaming program.
https://github.com/apache/spark/pull/247

> 4.  Then I tried restarting the ssc again (ssc.start) and got another error:
> org.apache.spark.SparkException: JobScheduler already started
> Is restarting an ssc supported?
>


Restarting is ideally not supported. However, the behavior was not
explicitly checked. The above pull requests
makes the behavior more explicitly by throwing the right warnings and
exceptions.

> 5.  When I perform an operation like wordCounts.print(), that operation will
> execution on each batch, ever n seconds.  Is there a way I can undo that
> operation?  That is, I want it to *stop* executing that print ever n
> seconds...without having to stop the stream.
>
> What I'm really asking is...can I explore DStreams interactively the way I
> can explore my data in regular Spark.  In regular Spark, I might perform
> various operations on an RDD to see what happens.  So at first, I might have
> used "split(" ") to tokenize my input text, but now I want to try using
> split(",") instead, after the stream has already started running.  Can I do
> that?
>
> I did find out that if add a new operation to an existing dstream (say,
> words.print()) after the ssc.start it works. It *will* add the second
> print() call to the execution list every n seconds.
>
> but if I try to add new dstreams, e.g.
> ...
>
> ssc.start()
>
> var testpairs = words.map(x => (x, "TEST"))
> testpairs.print()
>
>
> I get an error:
>
> 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> 1395932270000 ms
> java.lang.Exception:
> org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> initialized
>
>
> Is this sort of interactive use just not supported?


Modifying the DStream operations after the context has started is not
officially supported. However dynamically changing the computation can
be done using DStream.transform() or DStream.foreachRDD()
Both these operations allow you to do arbitrary RDD operations on each
RDD. So you can dynamically modify what RDD operations are used within
the DStream transform / foreachRDD (so you are not changing the
DStream operations, only whats inside the DStream operation). But to
use this really interactively, you have to write a bit of additional
code that allows the user to interactively specify the function
applied on each RDD.



>
> Thanks!
>
> Diana