You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by RK <pr...@yahoo.com.INVALID> on 2014/12/30 22:41:10 UTC

Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Here is the code for my streaming job.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")sparkConf.set("spark.default.parallelism", "100")sparkConf.set("spark.shuffle.consolidateFiles", "true")sparkConf.set("spark.speculation", "true")sparkConf.set("spark.speculation.interval", "5000")sparkConf.set("spark.speculation.quantile", "0.9")sparkConf.set("spark.speculation.multiplier", "3")sparkConf.set("spark.mesos.coarse", "true")sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")sparkConf.set("spark.shuffle.manager", "SORT")
val ssc = new StreamingContext(sparkConf, Seconds(10))ssc.checkpoint(checkpointDir)
val topics = "trace"val numThreads = 1val topicMap = topics.split(",").map((_,numThreads)).toMap
val kafkaPartitions = 20val kafkaDStreams = (1 to kafkaPartitions).map { _ =>  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
val lines = ssc.union(kafkaDStreams)val words = lines.map(line => doSomething_1(line))val filteredWords = words.filter(word => word != "test")val groupedWords = filteredWords.map(word => (word, 1))
val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) => count > 50}val finalResult = windowedWordsFiltered.foreachRDD(words => doSomething_2(words))
ssc.start()ssc.awaitTermination()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
I am running this job on a 9 slave AWS EC2 cluster with each slave node has 32 vCPU & 60GB memory.
When I start this job, the processing time is usually around 5 - 6 seconds for the 10 seconds batch and the scheduling delay is around 0 seconds or a few ms. However, as the job run for 6 - 8 hours, the processing time increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6 hours.
When I look at the completed stages, I see that the time taken for getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes from around 2 seconds to more than a few minutes.
Clicking on +details next to this stage description shows the following execution trace.org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)scala.Option.orElse(Option.scala:257)org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)scala.util.Try$.apply(Try.scala:161)org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
When I click on one of these slow stages that executed after 6 - 8 hours, I find the following information for individual tasks inside.
- All tasks seem to execute with PROCESS_LOCAL locality.- Quite a few of these tasks seem to spend anywhere between 30 - 80% of their time in GC. Although, when I look at the total memory usage on each of the slave nodes under executors information, I see that the usage is only around 200MB out of 20GB available.
Even after a few hours, the map stages (val groupedWords = filteredWords.map(word => (word, 1))) seem to have consistent times as during the start of the job which seems to indicate that this code is fine.Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
Based on the information that map is as fast as during the start of job and that there is no waiting batches, I am assuming that the getCallSite stages correspond to getting data out of Kafka? Is this correct or not?If my assumption is correct, Is there anything that I could do to optimize receiving data from Kafka?If not, which part of my code needs to be optimized to reduce the scheduling delay?
Thanks,RK

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by RK <pr...@yahoo.com.INVALID>.
TD, here is the processing & GC stats after running the job for 11 hours.


Sorted all stages in desc order of duration. The getCallSite call that occurred recently tops the list at 35s even though getCallSite operation at the start of the job completed in 2s.

All the map operations stay under 1 second throughout 11 hours.

Here is more information about the longest ran getCallSite stage.





 

     On Tuesday, December 30, 2014 6:26 PM, RK <pr...@yahoo.com.INVALID> wrote:
   

 I am running the job on 1.1.1.
I will let the job run overnight and send you more info on computation vs GC time tomorrow.
BTW, do you know what the stage description named "getCallSite at DStream.scala:294" might mean?
Thanks,RK
 

     On Tuesday, December 30, 2014 6:02 PM, Tathagata Das <ta...@gmail.com> wrote:
   

 Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



    

   

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by RK <pr...@yahoo.com.INVALID>.
I am running the job on 1.1.1.
I will let the job run overnight and send you more info on computation vs GC time tomorrow.
BTW, do you know what the stage description named "getCallSite at DStream.scala:294" might mean?
Thanks,RK
 

     On Tuesday, December 30, 2014 6:02 PM, Tathagata Das <ta...@gmail.com> wrote:
   

 Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



   

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by Tathagata Das <ta...@gmail.com>.
>From the screenshots its clear that GC of 30 seconds is the culprit. If one
of the node is stuck in GC that delays the whole  stage/job/batch. Here are
some more questions.

- what is the storage levels used in the input streams and/or persisted
streams?

- could you go to the log of the executor that suffered the GC and see what
is usage of the block manager? You should find log4j lines saying
"MemoryStore: added xxx MB, free yy MB). Also if you can print GC lines, do
check the generation sizes.

- Do you see the problem if you are not doing the window operation? That is
if you do a simple filter, map, etc.? If no, can you try doing
reduceByKeyAndWindow without the "_ + _" (the other version of
reduceByKeyAndWindow) and see the problem still persists?

TD
On Jan 5, 2015 6:35 AM, "Gerard Maas" <ge...@gmail.com> wrote:

> Hi RK,
>
> Have you tried observing 'processing time' ?? You can derive it from the
> Streaming metrics by  doing (lastReceivedBatch_processEndTime -
> lastReceivedBatch_processStartTime)
> I've observed the same issue and it seems to be a sudden failure in an
> otherwise fine-running job. After several hours of execution, the job
> starts degrading in performance, ultimately resulting in failure.
>
> Here's a chart of 'processing time' over time. Each data point is the
> processing time of the 10-second job interval.
>
>
> ​
> We made a lot of progress in understanding the factors affecting the
> streaming job performance that I summarized in this post:
> http://www.virdata.com/tuning-spark/
> Yet, this one is still puzzling us. I'm currently adding GC times to the
> metrics, to see if I find a correlation.
>
> -kr, Gerard.
>
>
>
>
> On Thu, Jan 1, 2015 at 8:36 PM, RK <pr...@yahoo.com.invalid> wrote:
>
>> TD, here is the processing & GC stats after running the job for 11 hours.
>>
>> [image: Inline image]
>>
>> Sorted all stages in desc order of duration. The getCallSite call that
>> occurred recently tops the list at 35s even though getCallSite operation at
>> the start of the job completed in 2s.
>> [image: Inline image]
>>
>> All the map operations stay under 1 second throughout 11 hours.
>> [image: Inline image]
>>
>> Here is more information about the longest ran getCallSite stage.
>> [image: Inline image]
>>
>> [image: Inline image]
>>
>> [image: Inline image]
>>
>> Thanks,
>> RK
>>
>>
>>
>>   On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>
>> Which version of Spark Streaming are you using.
>>
>> When the batch processing time increases to 15-20 seconds, could you
>> compare the task times compared to the tasks time when the application
>> is just launched? Basically is the increase from 6 seconds to 15-20
>> seconds is caused by increase in computation or because of GC's etc.
>>
>> On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
>> > Here is the code for my streaming job.
>> >
>> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> > val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
>> > sparkConf.set("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer")
>> > sparkConf.set("spark.default.parallelism", "100")
>> > sparkConf.set("spark.shuffle.consolidateFiles", "true")
>> > sparkConf.set("spark.speculation", "true")
>> > sparkConf.set("spark.speculation.interval", "5000")
>> > sparkConf.set("spark.speculation.quantile", "0.9")
>> > sparkConf.set("spark.speculation.multiplier", "3")
>> > sparkConf.set("spark.mesos.coarse", "true")
>> > sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
>> > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
>> > sparkConf.set("spark.shuffle.manager", "SORT")
>> >
>> > val ssc = new StreamingContext(sparkConf, Seconds(10))
>> > ssc.checkpoint(checkpointDir)
>> >
>> > val topics = "trace"
>> > val numThreads = 1
>> > val topicMap = topics.split(",").map((_,numThreads)).toMap
>> >
>> > val kafkaPartitions = 20
>> > val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>> >  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>> > }
>> >
>> > val lines = ssc.union(kafkaDStreams)
>> > val words = lines.map(line => doSomething_1(line))
>> > val filteredWords = words.filter(word => word != "test")
>> > val groupedWords = filteredWords.map(word => (word, 1))
>> >
>> > val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
>> > Seconds(30), Seconds(10))
>> > val windowedWordsFiltered = windowedWordCounts.filter{case (word,
>> count) =>
>> > count > 50}
>> > val finalResult = windowedWordsFiltered.foreachRDD(words =>
>> > doSomething_2(words))
>> >
>> > ssc.start()
>> > ssc.awaitTermination()
>> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> >
>> > I am running this job on a 9 slave AWS EC2 cluster with each slave node
>> has
>> > 32 vCPU & 60GB memory.
>> >
>> > When I start this job, the processing time is usually around 5 - 6
>> seconds
>> > for the 10 seconds batch and the scheduling delay is around 0 seconds
>> or a
>> > few ms. However, as the job run for 6 - 8 hours, the processing time
>> > increases to 15 - 20 seconds but the scheduling delay is increasing to
>> 4 - 6
>> > hours.
>> >
>> > When I look at the completed stages, I see that the time taken for
>> > getCallSite at DStream.scala:294 keeps increasing as time passes by. It
>> goes
>> > from around 2 seconds to more than a few minutes.
>> >
>> > Clicking on +details next to this stage description shows the following
>> > execution trace.
>> > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
>> >
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
>> >
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
>> > scala.Option.orElse(Option.scala:257)
>> >
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
>> >
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> >
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>> > scala.util.Try$.apply(Try.scala:161)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>> > org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>> >
>> > When I click on one of these slow stages that executed after 6 - 8
>> hours, I
>> > find the following information for individual tasks inside.
>> > - All tasks seem to execute with PROCESS_LOCAL locality.
>> > - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
>> > their time in GC. Although, when I look at the total memory usage on
>> each of
>> > the slave nodes under executors information, I see that the usage is
>> only
>> > around 200MB out of 20GB available.
>> >
>> > Even after a few hours, the map stages (val groupedWords =
>> > filteredWords.map(word => (word, 1))) seem to have consistent times as
>> > during the start of the job which seems to indicate that this code is
>> fine.
>> > Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>> >
>> > Based on the information that map is as fast as during the start of job
>> and
>> > that there is no waiting batches, I am assuming that the getCallSite
>> stages
>> > correspond to getting data out of Kafka? Is this correct or not?
>> > If my assumption is correct, Is there anything that I could do to
>> optimize
>> > receiving data from Kafka?
>> > If not, which part of my code needs to be optimized to reduce the
>> scheduling
>> > delay?
>> >
>> > Thanks,
>> > RK
>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by Gerard Maas <ge...@gmail.com>.
Hi RK,

Have you tried observing 'processing time' ?? You can derive it from the
Streaming metrics by  doing (lastReceivedBatch_processEndTime -
lastReceivedBatch_processStartTime)
I've observed the same issue and it seems to be a sudden failure in an
otherwise fine-running job. After several hours of execution, the job
starts degrading in performance, ultimately resulting in failure.

Here's a chart of 'processing time' over time. Each data point is the
processing time of the 10-second job interval.


​
We made a lot of progress in understanding the factors affecting the
streaming job performance that I summarized in this post:
http://www.virdata.com/tuning-spark/
Yet, this one is still puzzling us. I'm currently adding GC times to the
metrics, to see if I find a correlation.

-kr, Gerard.




On Thu, Jan 1, 2015 at 8:36 PM, RK <pr...@yahoo.com.invalid> wrote:

> TD, here is the processing & GC stats after running the job for 11 hours.
>
> [image: Inline image]
>
> Sorted all stages in desc order of duration. The getCallSite call that
> occurred recently tops the list at 35s even though getCallSite operation at
> the start of the job completed in 2s.
> [image: Inline image]
>
> All the map operations stay under 1 second throughout 11 hours.
> [image: Inline image]
>
> Here is more information about the longest ran getCallSite stage.
> [image: Inline image]
>
> [image: Inline image]
>
> [image: Inline image]
>
> Thanks,
> RK
>
>
>
>   On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>
> Which version of Spark Streaming are you using.
>
> When the batch processing time increases to 15-20 seconds, could you
> compare the task times compared to the tasks time when the application
> is just launched? Basically is the increase from 6 seconds to 15-20
> seconds is caused by increase in computation or because of GC's etc.
>
> On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
> > Here is the code for my streaming job.
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> > sparkConf.set("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer")
> > sparkConf.set("spark.default.parallelism", "100")
> > sparkConf.set("spark.shuffle.consolidateFiles", "true")
> > sparkConf.set("spark.speculation", "true")
> > sparkConf.set("spark.speculation.interval", "5000")
> > sparkConf.set("spark.speculation.quantile", "0.9")
> > sparkConf.set("spark.speculation.multiplier", "3")
> > sparkConf.set("spark.mesos.coarse", "true")
> > sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> > sparkConf.set("spark.shuffle.manager", "SORT")
> >
> > val ssc = new StreamingContext(sparkConf, Seconds(10))
> > ssc.checkpoint(checkpointDir)
> >
> > val topics = "trace"
> > val numThreads = 1
> > val topicMap = topics.split(",").map((_,numThreads)).toMap
> >
> > val kafkaPartitions = 20
> > val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
> >  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> > }
> >
> > val lines = ssc.union(kafkaDStreams)
> > val words = lines.map(line => doSomething_1(line))
> > val filteredWords = words.filter(word => word != "test")
> > val groupedWords = filteredWords.map(word => (word, 1))
> >
> > val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> > Seconds(30), Seconds(10))
> > val windowedWordsFiltered = windowedWordCounts.filter{case (word, count)
> =>
> > count > 50}
> > val finalResult = windowedWordsFiltered.foreachRDD(words =>
> > doSomething_2(words))
> >
> > ssc.start()
> > ssc.awaitTermination()
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> > I am running this job on a 9 slave AWS EC2 cluster with each slave node
> has
> > 32 vCPU & 60GB memory.
> >
> > When I start this job, the processing time is usually around 5 - 6
> seconds
> > for the 10 seconds batch and the scheduling delay is around 0 seconds or
> a
> > few ms. However, as the job run for 6 - 8 hours, the processing time
> > increases to 15 - 20 seconds but the scheduling delay is increasing to 4
> - 6
> > hours.
> >
> > When I look at the completed stages, I see that the time taken for
> > getCallSite at DStream.scala:294 keeps increasing as time passes by. It
> goes
> > from around 2 seconds to more than a few minutes.
> >
> > Clicking on +details next to this stage description shows the following
> > execution trace.
> > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> > scala.Option.orElse(Option.scala:257)
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> > scala.util.Try$.apply(Try.scala:161)
> >
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> > org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
> >
> > When I click on one of these slow stages that executed after 6 - 8
> hours, I
> > find the following information for individual tasks inside.
> > - All tasks seem to execute with PROCESS_LOCAL locality.
> > - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> > their time in GC. Although, when I look at the total memory usage on
> each of
> > the slave nodes under executors information, I see that the usage is only
> > around 200MB out of 20GB available.
> >
> > Even after a few hours, the map stages (val groupedWords =
> > filteredWords.map(word => (word, 1))) seem to have consistent times as
> > during the start of the job which seems to indicate that this code is
> fine.
> > Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
> >
> > Based on the information that map is as fast as during the start of job
> and
> > that there is no waiting batches, I am assuming that the getCallSite
> stages
> > correspond to getting data out of Kafka? Is this correct or not?
> > If my assumption is correct, Is there anything that I could do to
> optimize
> > receiving data from Kafka?
> > If not, which part of my code needs to be optimized to reduce the
> scheduling
> > delay?
> >
> > Thanks,
> > RK
>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by RK <pr...@yahoo.com.INVALID>.
TD, here is the processing & GC stats after running the job for 11 hours.


Sorted all stages in desc order of duration. The getCallSite call that occurred recently tops the list at 35s even though getCallSite operation at the start of the job completed in 2s.

All the map operations stay under 1 second throughout 11 hours.

Here is more information about the longest ran getCallSite stage.





Thanks,RK
 

     On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <ta...@gmail.com> wrote:
   

 Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



   

Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)

Posted by Tathagata Das <ta...@gmail.com>.
Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <pr...@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>   KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org