You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mario Briggs (JIRA)" <ji...@apache.org> on 2016/04/14 17:33:25 UTC

[jira] [Comment Edited] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs

    [ https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241358#comment-15241358 ] 

Mario Briggs edited comment on SPARK-14597 at 4/14/16 3:32 PM:
---------------------------------------------------------------

I think there is an opportunity to merge both your approaches above. Let me explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is already implemented. 

So rather than providing a single time metric and a single start/complete event that encompasses the generateJob for all OutputStreams, you could provide a start/complete event for each individual outputstream generateJob and onBatchComplete provide the metric for generateJob of all OutputStreams. This way a user can also figure out if a individual outputstream is the culprit. 

The above would require 2 additional things - pass an eventLoop to DStreamGraph.generateJobs() method. This eventLoop should not be the existing eventLoop instance in JobGenerator, but rather another new eventLoop instance (say genJobEventLoop) in JobGenerator. This is because the existing JobGenerator.eventLoop instance's thread is used to actually drive the Job Generation and making that thread do additional tasks will increase latency in Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and GenJobCompleted event and use those events to fire corresponding events to the ListenerBus and gather the generateJob metric for all outputStreams to set it in the JobSet 


was (Author: mariobriggs):
I think there is an opportunity to merge both your approaches above. Let me explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is already implemented. 

So rather than providing a single time metric and a single start/complete event that encompasses the generateJob for all OutputStreams, you could provide a start/complete event for each individual outputstream generateJob and onBatchComplete provide the metric for generateJob of all OutputStreams. This way a user can also figure out if a individual outputstream is the culprit. 

The above would require 2 additional things - pass an eventLoop to DStreamGraph.generateJobs() method. This eventLoop should not be the existing eventLoop instance in JobGenerator, but rather another new eventLoop instance (say genJobEventLoop) in JobGenerator. This is because the existing JobGenerator.eventLoop instance's thread is used to actually drive the Job Generation and making that thread do additional tasks will increase latency in Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and GenJobCompleted event and use those events to fire corresponding events to the ListenerBus and gather the generateJob metric for all outputStreams and set it in the JobSet 

> Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-14597
>                 URL: https://issues.apache.org/jira/browse/SPARK-14597
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, Streaming
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Sachin Aggarwal
>            Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were looking for was actual processing time per batch. The StreamingListener.onBatchCompleted event provides a BatchInfo object that provided this information. It provides the following data
>  - processingDelay
>  - schedulingDelay
>  - totalDelay
>  - Submission Time
>  The above are essentially calculated from the streaming JobScheduler clocking the processingStartTime and processingEndTime for each JobSet. Another metric available is submissionTime which is when a Jobset was put on the Streaming Scheduler's Queue. 
>  
> So we took processing delay as our actual processing time per batch. However to maintain a stable streaming application, we found that the our batch interval had to be a little less than DOUBLE of the processingDelay metric reported. (We are using a DirectKafkaInputStream). On digging further, we found that processingDelay is only clocking time spent in the ForEachRDD closure of the Streaming application and that JobGenerator's graph.generateJobs (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248) method takes a significant more amount of time.
>  Thus a true reflection of processing time is
>  a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
>  b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
>  c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay metric)
>  d - Time spent in Jobset's job run (existing processingDelay metric)
>  
>  Additionally a JobGeneratorQueue delay (#a) could be due to either graph.generateJobs taking longer than batchInterval or other JobGenerator events like checkpointing adding up time. Thus it would be beneficial to report time taken by the checkpointing Job as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org