You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tim Smith <se...@gmail.com> on 2014/09/12 19:09:53 UTC

Stable spark streaming app

Hi,

Anyone have a stable streaming app running in "production"? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Stable spark streaming app

Posted by ab...@thomsonreuters.com.
Nice write-up... very helpful!


-----Original Message-----
From: Tim Smith [mailto:secsubs@gmail.com] 
Sent: Wednesday, September 17, 2014 1:11 PM
Cc: spark users
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best:
SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails"

I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the "stdout" file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto "Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and set SPARK_JAVA_OPTS there.

- Set these two params - "spark.yarn.executor.memoryOverhead"
"spark.yarn.driver.memoryOverhead". Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume "executor-memory" minus "spark.yarn.executor.memoryOverhead" so you should see each executor JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as you see above)
- I map one receive task for each kafka partition
- Instead of doing a "union" on all the incoming streams and then
repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a "union" on all incoming dStreams. I now repartition each of the five streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
- Process data per partition instead of per RDD: dataout.foreachRDD( rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
- Specific to kafka: when I create "new Producer", make sure I "close"
it else I had a ton of "too many files open" errors :)
- Use immutable objects as far as possible. If I use mutable objects within a method/class then I turn them into immutable before passing onto another class/method.
- For logging, create a LogService object that I then use for other object/class declarations. Once instantiated, I can make "logInfo"
calls from within other Objects/Methods/Classes and output goes to the "stderr" file in the Yarn container logs. Good for debugging stream processing logic.

Currently, my processing delay is lower than my dStream time window so all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar <ku...@gmail.com> wrote:
> Hmm, no response to this thread!
>
> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>
> ----- Original Message -----
> From: "Tim Smith" <se...@gmail.com>
> To: "spark users" <us...@spark.apache.org>
> Sent: Friday, September 12, 2014 10:09:53 AM
> Subject: Stable spark streaming app
>
> Hi,
>
> Anyone have a stable streaming app running in "production"? Can you 
> share some overview of the app and setup like number of nodes, events 
> per second, broad stream processing workflow, config highlights etc?
>
> Thanks,
>
> Tim
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For 
> additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Tim Smith <se...@gmail.com>.
Dibyendu - I am using the Kafka consumer built into Spark streaming.
Pulled the jar from here:
http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar

Thanks for the sbt-assembly link, Soumitra.

On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya
<di...@gmail.com> wrote:
> Hi Tim
>
> Just curious to know ; Which Kafka Consumer you have used ?
>
> Dib
>
> On Sep 18, 2014 4:40 AM, "Tim Smith" <se...@gmail.com> wrote:
>>
>> Thanks :)
>>
>> On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote:
>> > Thanks Tim, this is super helpful!
>> >
>> > Question about jars and spark-submit:  why do you provide
>> > myawesomeapp.jar as the program jar but then include other jars via
>> > the --jars argument?  Have you tried building one uber jar with all
>> > dependencies and just sending that to Spark as your app jar?
>>
>> I guess that is mostly because I am Scala/sbt noob :) How do I create
>> the uber jar? My .sbt file says:
>> name := "My Awesome App"
>> version := "1.025"
>> scalaVersion := "2.10.4"
>> resolvers += "Apache repo" at
>> "https://repository.apache.org/content/repositories/releases"
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.0.0"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"
>>
>> Then I run "sbt package" to generate myawesomeapp.jar.
>>
>> >
>> > Also, have you ever seen any issues with Spark caching your app jar
>> > between runs even if it changes?
>>
>> Not that I can tell but then maybe because I use Yarn, I might be
>> shielded from some jar distribution bugs in Spark?
>>
>>
>> >
>> > On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <se...@gmail.com> wrote:
>> >> I don't have anything in production yet but I now at least have a
>> >> stable (running for more than 24 hours) streaming app. Earlier, the
>> >> app would crash for all sorts of reasons. Caveats/setup:
>> >> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> >> - Yarn for RM
>> >> - Input and Output to Kafka
>> >> - CDH 5.1
>> >> - 11 node cluster with 32-cores and 48G max container size for each
>> >> node (Yarn managed)
>> >> - 5 partition Kafka topic - both in and out
>> >> - Roughly, an average of 25k messages per second
>> >> - App written in Scala (warning: I am a Scala noob)
>> >>
>> >> Few things I had to add/tweak to get the app to be stable:
>> >> - The executor JVMs did not have any GC options set, by default. This
>> >> might be more of a CDH issue. I noticed that while the Yarn container
>> >> and other Spark ancillary tasks had GC options set at launch but none
>> >> for the executors. So I played with different GC options and this
>> >> worked best:
>> >> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> >> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> >> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> >> -XX:+PrintGCDetails"
>> >>
>> >> I tried G1GC but for some reason it just didn't work. I am not a Java
>> >> programmer or expert so my conclusion is purely trial and error based.
>> >> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> >> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> >> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> >> specifically, even though you don't run Spark as a service (since you
>> >> are using Yarn for RM), you can goto "Spark Client Advanced
>> >> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> >> set SPARK_JAVA_OPTS there.
>> >>
>> >> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> >> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> >> because the executors running the kafka receivers would get killed by
>> >> Yarn for over utilization of memory. Now, these are my memory settings
>> >> (I will paste the entire app launch params later in the email):
>> >> --driver-memory 2G \
>> >> --executor-memory 16G \
>> >> --spark.yarn.executor.memoryOverhead 4096 \
>> >> --spark.yarn.driver.memoryOverhead 1024 \
>> >>
>> >> Your total executor JVM will consume "executor-memory" minus
>> >> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> >> JVM consuming no more than 12G, in this case.
>> >>
>> >> Here is how I launch my app:
>> >> run=`date +"%m-%d-%YT%T"`; \
>> >> nohup spark-submit --class myAwesomeApp \
>> >> --master yarn myawesomeapp.jar \
>> >> --jars
>> >> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> >> \
>> >> --driver-memory 2G \
>> >> --executor-memory 16G \
>> >> --executor-cores 16 \
>> >> --num-executors 10 \
>> >> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> >> --spark.rdd.compress true \
>> >> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
>> >> \
>> >> --spark.akka.threads 64 \
>> >> --spark.akka.frameSize 500 \
>> >> --spark.task.maxFailures 64 \
>> >> --spark.scheduler.mode FAIR \
>> >> --spark.yarn.executor.memoryOverhead 4096 \
>> >> --spark.yarn.driver.memoryOverhead 1024 \
>> >> --spark.shuffle.consolidateFiles true \
>> >> --spark.default.parallelism 528 \
>> >>>logs/normRunLog-$run.log \
>> >> 2>logs/normRunLogError-$run.log & \
>> >> echo $! > logs/current-run.pid
>> >>
>> >> Some code optimizations (or, goof ups that I fixed). I did not
>> >> scientifically measure the impact of each but I think they helped:
>> >> - Made all my classes and objects serializable and then use Kryo (as
>> >> you see above)
>> >> - I map one receive task for each kafka partition
>> >> - Instead of doing a "union" on all the incoming streams and then
>> >> repartition() I now repartition() each incoming stream and process
>> >> them separately. I believe this reduces shuffle.
>> >> - Reduced number of repartitions. I was doing 128 after doing a
>> >> "union" on all incoming dStreams. I now repartition each of the five
>> >> streams separately (in a loop) to 24.
>> >> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> >> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> >> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> >> - Specific to kafka: when I create "new Producer", make sure I "close"
>> >> it else I had a ton of "too many files open" errors :)
>> >> - Use immutable objects as far as possible. If I use mutable objects
>> >> within a method/class then I turn them into immutable before passing
>> >> onto another class/method.
>> >> - For logging, create a LogService object that I then use for other
>> >> object/class declarations. Once instantiated, I can make "logInfo"
>> >> calls from within other Objects/Methods/Classes and output goes to the
>> >> "stderr" file in the Yarn container logs. Good for debugging stream
>> >> processing logic.
>> >>
>> >> Currently, my processing delay is lower than my dStream time window so
>> >> all is good. I get a ton of these errors in my driver logs:
>> >> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> >> threw an exception
>> >>
>> >> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>> >>
>> >> Best I understand and have been told, this does not affect data
>> >> integrity but may cause un-necessary recomputes.
>> >>
>> >> Hope this helps,
>> >>
>> >> Tim
>> >>
>> >>
>> >> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>> >> <ku...@gmail.com> wrote:
>> >>> Hmm, no response to this thread!
>> >>>
>> >>> Adding to it, please share experiences of building an enterprise grade
>> >>> product based on Spark Streaming.
>> >>>
>> >>> I am exploring Spark Streaming for enterprise software and am
>> >>> cautiously optimistic about it. I see huge potential to improve
>> >>> debuggability of Spark.
>> >>>
>> >>> ----- Original Message -----
>> >>> From: "Tim Smith" <se...@gmail.com>
>> >>> To: "spark users" <us...@spark.apache.org>
>> >>> Sent: Friday, September 12, 2014 10:09:53 AM
>> >>> Subject: Stable spark streaming app
>> >>>
>> >>> Hi,
>> >>>
>> >>> Anyone have a stable streaming app running in "production"? Can you
>> >>> share some overview of the app and setup like number of nodes, events
>> >>> per second, broad stream processing workflow, config highlights etc?
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Tim
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Soumitra Kumar <ku...@gmail.com>.
Refer to https://github.com/sbt/sbt-assembly to generate a jar with dependencies.

I prefer not to build a big fat jar, since a bulk would be hadoop related and prefer to use what is installed on the host.

----- Original Message -----
From: "Tim Smith" <se...@gmail.com>
Cc: "spark users" <us...@spark.apache.org>
Sent: Wednesday, September 17, 2014 4:10:12 PM
Subject: Re: Stable spark streaming app

Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote:
> Thanks Tim, this is super helpful!
>
> Question about jars and spark-submit:  why do you provide
> myawesomeapp.jar as the program jar but then include other jars via
> the --jars argument?  Have you tried building one uber jar with all
> dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := "My Awesome App"
version := "1.025"
scalaVersion := "2.10.4"
resolvers += "Apache repo" at
"https://repository.apache.org/content/repositories/releases"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.0.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"

Then I run "sbt package" to generate myawesomeapp.jar.

>
> Also, have you ever seen any issues with Spark caching your app jar
> between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?


>
> On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <se...@gmail.com> wrote:
>> I don't have anything in production yet but I now at least have a
>> stable (running for more than 24 hours) streaming app. Earlier, the
>> app would crash for all sorts of reasons. Caveats/setup:
>> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> - Yarn for RM
>> - Input and Output to Kafka
>> - CDH 5.1
>> - 11 node cluster with 32-cores and 48G max container size for each
>> node (Yarn managed)
>> - 5 partition Kafka topic - both in and out
>> - Roughly, an average of 25k messages per second
>> - App written in Scala (warning: I am a Scala noob)
>>
>> Few things I had to add/tweak to get the app to be stable:
>> - The executor JVMs did not have any GC options set, by default. This
>> might be more of a CDH issue. I noticed that while the Yarn container
>> and other Spark ancillary tasks had GC options set at launch but none
>> for the executors. So I played with different GC options and this
>> worked best:
>> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> -XX:+PrintGCDetails"
>>
>> I tried G1GC but for some reason it just didn't work. I am not a Java
>> programmer or expert so my conclusion is purely trial and error based.
>> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> specifically, even though you don't run Spark as a service (since you
>> are using Yarn for RM), you can goto "Spark Client Advanced
>> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> set SPARK_JAVA_OPTS there.
>>
>> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> because the executors running the kafka receivers would get killed by
>> Yarn for over utilization of memory. Now, these are my memory settings
>> (I will paste the entire app launch params later in the email):
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>>
>> Your total executor JVM will consume "executor-memory" minus
>> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> JVM consuming no more than 12G, in this case.
>>
>> Here is how I launch my app:
>> run=`date +"%m-%d-%YT%T"`; \
>> nohup spark-submit --class myAwesomeApp \
>> --master yarn myawesomeapp.jar \
>> --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> \
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --executor-cores 16 \
>> --num-executors 10 \
>> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> --spark.rdd.compress true \
>> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
>> --spark.akka.threads 64 \
>> --spark.akka.frameSize 500 \
>> --spark.task.maxFailures 64 \
>> --spark.scheduler.mode FAIR \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>> --spark.shuffle.consolidateFiles true \
>> --spark.default.parallelism 528 \
>>>logs/normRunLog-$run.log \
>> 2>logs/normRunLogError-$run.log & \
>> echo $! > logs/current-run.pid
>>
>> Some code optimizations (or, goof ups that I fixed). I did not
>> scientifically measure the impact of each but I think they helped:
>> - Made all my classes and objects serializable and then use Kryo (as
>> you see above)
>> - I map one receive task for each kafka partition
>> - Instead of doing a "union" on all the incoming streams and then
>> repartition() I now repartition() each incoming stream and process
>> them separately. I believe this reduces shuffle.
>> - Reduced number of repartitions. I was doing 128 after doing a
>> "union" on all incoming dStreams. I now repartition each of the five
>> streams separately (in a loop) to 24.
>> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> - Specific to kafka: when I create "new Producer", make sure I "close"
>> it else I had a ton of "too many files open" errors :)
>> - Use immutable objects as far as possible. If I use mutable objects
>> within a method/class then I turn them into immutable before passing
>> onto another class/method.
>> - For logging, create a LogService object that I then use for other
>> object/class declarations. Once instantiated, I can make "logInfo"
>> calls from within other Objects/Methods/Classes and output goes to the
>> "stderr" file in the Yarn container logs. Good for debugging stream
>> processing logic.
>>
>> Currently, my processing delay is lower than my dStream time window so
>> all is good. I get a ton of these errors in my driver logs:
>> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> threw an exception
>>
>> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>>
>> Best I understand and have been told, this does not affect data
>> integrity but may cause un-necessary recomputes.
>>
>> Hope this helps,
>>
>> Tim
>>
>>
>> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>> <ku...@gmail.com> wrote:
>>> Hmm, no response to this thread!
>>>
>>> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>>>
>>> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>>>
>>> ----- Original Message -----
>>> From: "Tim Smith" <se...@gmail.com>
>>> To: "spark users" <us...@spark.apache.org>
>>> Sent: Friday, September 12, 2014 10:09:53 AM
>>> Subject: Stable spark streaming app
>>>
>>> Hi,
>>>
>>> Anyone have a stable streaming app running in "production"? Can you
>>> share some overview of the app and setup like number of nodes, events
>>> per second, broad stream processing workflow, config highlights etc?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Tim Smith <se...@gmail.com>.
Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote:
> Thanks Tim, this is super helpful!
>
> Question about jars and spark-submit:  why do you provide
> myawesomeapp.jar as the program jar but then include other jars via
> the --jars argument?  Have you tried building one uber jar with all
> dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := "My Awesome App"
version := "1.025"
scalaVersion := "2.10.4"
resolvers += "Apache repo" at
"https://repository.apache.org/content/repositories/releases"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.0.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"

Then I run "sbt package" to generate myawesomeapp.jar.

>
> Also, have you ever seen any issues with Spark caching your app jar
> between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?


>
> On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <se...@gmail.com> wrote:
>> I don't have anything in production yet but I now at least have a
>> stable (running for more than 24 hours) streaming app. Earlier, the
>> app would crash for all sorts of reasons. Caveats/setup:
>> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> - Yarn for RM
>> - Input and Output to Kafka
>> - CDH 5.1
>> - 11 node cluster with 32-cores and 48G max container size for each
>> node (Yarn managed)
>> - 5 partition Kafka topic - both in and out
>> - Roughly, an average of 25k messages per second
>> - App written in Scala (warning: I am a Scala noob)
>>
>> Few things I had to add/tweak to get the app to be stable:
>> - The executor JVMs did not have any GC options set, by default. This
>> might be more of a CDH issue. I noticed that while the Yarn container
>> and other Spark ancillary tasks had GC options set at launch but none
>> for the executors. So I played with different GC options and this
>> worked best:
>> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> -XX:+PrintGCDetails"
>>
>> I tried G1GC but for some reason it just didn't work. I am not a Java
>> programmer or expert so my conclusion is purely trial and error based.
>> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> specifically, even though you don't run Spark as a service (since you
>> are using Yarn for RM), you can goto "Spark Client Advanced
>> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> set SPARK_JAVA_OPTS there.
>>
>> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> because the executors running the kafka receivers would get killed by
>> Yarn for over utilization of memory. Now, these are my memory settings
>> (I will paste the entire app launch params later in the email):
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>>
>> Your total executor JVM will consume "executor-memory" minus
>> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> JVM consuming no more than 12G, in this case.
>>
>> Here is how I launch my app:
>> run=`date +"%m-%d-%YT%T"`; \
>> nohup spark-submit --class myAwesomeApp \
>> --master yarn myawesomeapp.jar \
>> --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> \
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --executor-cores 16 \
>> --num-executors 10 \
>> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> --spark.rdd.compress true \
>> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
>> --spark.akka.threads 64 \
>> --spark.akka.frameSize 500 \
>> --spark.task.maxFailures 64 \
>> --spark.scheduler.mode FAIR \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>> --spark.shuffle.consolidateFiles true \
>> --spark.default.parallelism 528 \
>>>logs/normRunLog-$run.log \
>> 2>logs/normRunLogError-$run.log & \
>> echo $! > logs/current-run.pid
>>
>> Some code optimizations (or, goof ups that I fixed). I did not
>> scientifically measure the impact of each but I think they helped:
>> - Made all my classes and objects serializable and then use Kryo (as
>> you see above)
>> - I map one receive task for each kafka partition
>> - Instead of doing a "union" on all the incoming streams and then
>> repartition() I now repartition() each incoming stream and process
>> them separately. I believe this reduces shuffle.
>> - Reduced number of repartitions. I was doing 128 after doing a
>> "union" on all incoming dStreams. I now repartition each of the five
>> streams separately (in a loop) to 24.
>> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> - Specific to kafka: when I create "new Producer", make sure I "close"
>> it else I had a ton of "too many files open" errors :)
>> - Use immutable objects as far as possible. If I use mutable objects
>> within a method/class then I turn them into immutable before passing
>> onto another class/method.
>> - For logging, create a LogService object that I then use for other
>> object/class declarations. Once instantiated, I can make "logInfo"
>> calls from within other Objects/Methods/Classes and output goes to the
>> "stderr" file in the Yarn container logs. Good for debugging stream
>> processing logic.
>>
>> Currently, my processing delay is lower than my dStream time window so
>> all is good. I get a ton of these errors in my driver logs:
>> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> threw an exception
>>
>> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>>
>> Best I understand and have been told, this does not affect data
>> integrity but may cause un-necessary recomputes.
>>
>> Hope this helps,
>>
>> Tim
>>
>>
>> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>> <ku...@gmail.com> wrote:
>>> Hmm, no response to this thread!
>>>
>>> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>>>
>>> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>>>
>>> ----- Original Message -----
>>> From: "Tim Smith" <se...@gmail.com>
>>> To: "spark users" <us...@spark.apache.org>
>>> Sent: Friday, September 12, 2014 10:09:53 AM
>>> Subject: Stable spark streaming app
>>>
>>> Hi,
>>>
>>> Anyone have a stable streaming app running in "production"? Can you
>>> share some overview of the app and setup like number of nodes, events
>>> per second, broad stream processing workflow, config highlights etc?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Paul Wais <pw...@yelp.com>.
Thanks Tim, this is super helpful!

Question about jars and spark-submit:  why do you provide
myawesomeapp.jar as the program jar but then include other jars via
the --jars argument?  Have you tried building one uber jar with all
dependencies and just sending that to Spark as your app jar?

Also, have you ever seen any issues with Spark caching your app jar
between runs even if it changes?

On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <se...@gmail.com> wrote:
> I don't have anything in production yet but I now at least have a
> stable (running for more than 24 hours) streaming app. Earlier, the
> app would crash for all sorts of reasons. Caveats/setup:
> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
> - Yarn for RM
> - Input and Output to Kafka
> - CDH 5.1
> - 11 node cluster with 32-cores and 48G max container size for each
> node (Yarn managed)
> - 5 partition Kafka topic - both in and out
> - Roughly, an average of 25k messages per second
> - App written in Scala (warning: I am a Scala noob)
>
> Few things I had to add/tweak to get the app to be stable:
> - The executor JVMs did not have any GC options set, by default. This
> might be more of a CDH issue. I noticed that while the Yarn container
> and other Spark ancillary tasks had GC options set at launch but none
> for the executors. So I played with different GC options and this
> worked best:
> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
> -XX:+PrintGCDetails"
>
> I tried G1GC but for some reason it just didn't work. I am not a Java
> programmer or expert so my conclusion is purely trial and error based.
> The GC logs, with these flags, go to the "stdout" file in the Yarn
> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
> specifically, even though you don't run Spark as a service (since you
> are using Yarn for RM), you can goto "Spark Client Advanced
> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
> set SPARK_JAVA_OPTS there.
>
> - Set these two params - "spark.yarn.executor.memoryOverhead"
> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
> because the executors running the kafka receivers would get killed by
> Yarn for over utilization of memory. Now, these are my memory settings
> (I will paste the entire app launch params later in the email):
> --driver-memory 2G \
> --executor-memory 16G \
> --spark.yarn.executor.memoryOverhead 4096 \
> --spark.yarn.driver.memoryOverhead 1024 \
>
> Your total executor JVM will consume "executor-memory" minus
> "spark.yarn.executor.memoryOverhead" so you should see each executor
> JVM consuming no more than 12G, in this case.
>
> Here is how I launch my app:
> run=`date +"%m-%d-%YT%T"`; \
> nohup spark-submit --class myAwesomeApp \
> --master yarn myawesomeapp.jar \
> --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> \
> --driver-memory 2G \
> --executor-memory 16G \
> --executor-cores 16 \
> --num-executors 10 \
> --spark.serializer org.apache.spark.serializer.KryoSerializer \
> --spark.rdd.compress true \
> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
> --spark.akka.threads 64 \
> --spark.akka.frameSize 500 \
> --spark.task.maxFailures 64 \
> --spark.scheduler.mode FAIR \
> --spark.yarn.executor.memoryOverhead 4096 \
> --spark.yarn.driver.memoryOverhead 1024 \
> --spark.shuffle.consolidateFiles true \
> --spark.default.parallelism 528 \
>>logs/normRunLog-$run.log \
> 2>logs/normRunLogError-$run.log & \
> echo $! > logs/current-run.pid
>
> Some code optimizations (or, goof ups that I fixed). I did not
> scientifically measure the impact of each but I think they helped:
> - Made all my classes and objects serializable and then use Kryo (as
> you see above)
> - I map one receive task for each kafka partition
> - Instead of doing a "union" on all the incoming streams and then
> repartition() I now repartition() each incoming stream and process
> them separately. I believe this reduces shuffle.
> - Reduced number of repartitions. I was doing 128 after doing a
> "union" on all incoming dStreams. I now repartition each of the five
> streams separately (in a loop) to 24.
> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
> - Process data per partition instead of per RDD: dataout.foreachRDD(
> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
> - Specific to kafka: when I create "new Producer", make sure I "close"
> it else I had a ton of "too many files open" errors :)
> - Use immutable objects as far as possible. If I use mutable objects
> within a method/class then I turn them into immutable before passing
> onto another class/method.
> - For logging, create a LogService object that I then use for other
> object/class declarations. Once instantiated, I can make "logInfo"
> calls from within other Objects/Methods/Classes and output goes to the
> "stderr" file in the Yarn container logs. Good for debugging stream
> processing logic.
>
> Currently, my processing delay is lower than my dStream time window so
> all is good. I get a ton of these errors in my driver logs:
> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
> threw an exception
>
> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>
> Best I understand and have been told, this does not affect data
> integrity but may cause un-necessary recomputes.
>
> Hope this helps,
>
> Tim
>
>
> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
> <ku...@gmail.com> wrote:
>> Hmm, no response to this thread!
>>
>> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>>
>> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>>
>> ----- Original Message -----
>> From: "Tim Smith" <se...@gmail.com>
>> To: "spark users" <us...@spark.apache.org>
>> Sent: Friday, September 12, 2014 10:09:53 AM
>> Subject: Stable spark streaming app
>>
>> Hi,
>>
>> Anyone have a stable streaming app running in "production"? Can you
>> share some overview of the app and setup like number of nodes, events
>> per second, broad stream processing workflow, config highlights etc?
>>
>> Thanks,
>>
>> Tim
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Soumitra Kumar <ku...@gmail.com>.
Thanks Tim for the detailed email, would help me a lot.

I have:

. 3 nodes CDH5.1 cluster with 16G memory.

. Majority of code in Scala, some part in Java (using Cascading earlier).

. Inputs are bunch of textFileStream directories.

. Every batch output is going to Parquet files, and to HBase
  Had to set "export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar" fix some issue during class loading.
  For some reason 'spark.driver.extraClassPath' did not work.

. I started doing union pretty early in the flow, but did not work because all of my classes are not serializable. Each DStream in isolation works but if I union them early, then got into serialization issues.

. Took a while to find the log directory, /run/spark/work .

. Deploying at "--master spark://mymachine:7077"

. Modified '/etc/spark/conf.cloudera.spark/log4j.properties' for logging

. Currently I cannot process > 1G file with this configuration. I tried various things but could not succeed yet.

----- Original Message -----
From: "Tim Smith" <se...@gmail.com>
Cc: "spark users" <us...@spark.apache.org>
Sent: Wednesday, September 17, 2014 1:11:12 PM
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails"

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the "stdout" file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto "Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
set SPARK_JAVA_OPTS there.

- Set these two params - "spark.yarn.executor.memoryOverhead"
"spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume "executor-memory" minus
"spark.yarn.executor.memoryOverhead" so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a "union" on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
"union" on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
- Process data per partition instead of per RDD: dataout.foreachRDD(
rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
- Specific to kafka: when I create "new Producer", make sure I "close"
it else I had a ton of "too many files open" errors :)
- Use immutable objects as far as possible. If I use mutable objects
within a method/class then I turn them into immutable before passing
onto another class/method.
- For logging, create a LogService object that I then use for other
object/class declarations. Once instantiated, I can make "logInfo"
calls from within other Objects/Methods/Classes and output goes to the
"stderr" file in the Yarn container logs. Good for debugging stream
processing logic.

Currently, my processing delay is lower than my dStream time window so
all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
<ku...@gmail.com> wrote:
> Hmm, no response to this thread!
>
> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>
> ----- Original Message -----
> From: "Tim Smith" <se...@gmail.com>
> To: "spark users" <us...@spark.apache.org>
> Sent: Friday, September 12, 2014 10:09:53 AM
> Subject: Stable spark streaming app
>
> Hi,
>
> Anyone have a stable streaming app running in "production"? Can you
> share some overview of the app and setup like number of nodes, events
> per second, broad stream processing workflow, config highlights etc?
>
> Thanks,
>
> Tim
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Tim Smith <se...@gmail.com>.
I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails"

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the "stdout" file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto "Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
set SPARK_JAVA_OPTS there.

- Set these two params - "spark.yarn.executor.memoryOverhead"
"spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume "executor-memory" minus
"spark.yarn.executor.memoryOverhead" so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a "union" on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
"union" on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
- Process data per partition instead of per RDD: dataout.foreachRDD(
rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
- Specific to kafka: when I create "new Producer", make sure I "close"
it else I had a ton of "too many files open" errors :)
- Use immutable objects as far as possible. If I use mutable objects
within a method/class then I turn them into immutable before passing
onto another class/method.
- For logging, create a LogService object that I then use for other
object/class declarations. Once instantiated, I can make "logInfo"
calls from within other Objects/Methods/Classes and output goes to the
"stderr" file in the Yarn container logs. Good for debugging stream
processing logic.

Currently, my processing delay is lower than my dStream time window so
all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
<ku...@gmail.com> wrote:
> Hmm, no response to this thread!
>
> Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.
>
> ----- Original Message -----
> From: "Tim Smith" <se...@gmail.com>
> To: "spark users" <us...@spark.apache.org>
> Sent: Friday, September 12, 2014 10:09:53 AM
> Subject: Stable spark streaming app
>
> Hi,
>
> Anyone have a stable streaming app running in "production"? Can you
> share some overview of the app and setup like number of nodes, events
> per second, broad stream processing workflow, config highlights etc?
>
> Thanks,
>
> Tim
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Stable spark streaming app

Posted by Soumitra Kumar <ku...@gmail.com>.
Hmm, no response to this thread!

Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming.

I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark.

----- Original Message -----
From: "Tim Smith" <se...@gmail.com>
To: "spark users" <us...@spark.apache.org>
Sent: Friday, September 12, 2014 10:09:53 AM
Subject: Stable spark streaming app

Hi,

Anyone have a stable streaming app running in "production"? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org