You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/08/15 17:54:18 UTC

[jira] [Commented] (SAMZA-384) TaskInstance.send is slow with high task count

    [ https://issues.apache.org/jira/browse/SAMZA-384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14098693#comment-14098693 ] 

Chris Riccomini commented on SAMZA-384:
---------------------------------------

Also of note, deep deep down in the stack trace, the thing that actually takes the 20% CPU is Logger.isTraceEnabled(), which calls [Category.getEffectiveLevel()|http://grepcode.com/file/repository.springsource.com/org.apache.log4j/com.springsource.org.apache.log4j/1.2.15/org/apache/log4j/Category.java#Category.getEffectiveLevel%28%29]. These are both Log4J methods. The getEffectiveLevel call appears to walk a for loop.

> TaskInstance.send is slow with high task count
> ----------------------------------------------
>
>                 Key: SAMZA-384
>                 URL: https://issues.apache.org/jira/browse/SAMZA-384
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.8.0
>
>
> I have a job that runs with ~235 TaskInstances per-container. The behavior that I'm seeing is that the SamzaContainer is spending about 20% of its CPU time on a trace() logging call inside TaskInstance.send. The code for this method is:
> {code}
>   def send {
>     if (collector.envelopes.size > 0) {
>       trace("Sending messages for taskName: %s, %s" format (taskName, collector.envelopes.size))
>       metrics.sends.inc
>       metrics.messagesSent.inc(collector.envelopes.size)
>       collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
>       trace("Resetting collector for taskName: %s" format taskName)
>       collector.reset
>     } else {
>       trace("Skipping send for taskName %s because no messages were collected." format taskName)
>       metrics.sendsSkipped.inc
>     }
>   }
> {code}
> This method is invoked from the RunLoop.send method:
> {code}
>   private def send {
>     updateTimer(metrics.sendMs) {
>       trace("Triggering send in task instances.")
>       metrics.sends.inc
>       taskInstances.values.foreach(_.send)
>     }
>   }
> {code}
> So, I believe the problem here is that every send() invocation in the RunLoop ends up running 235 send() calls on my 235 TaskInstances.
> Since the RunLoop doesn't know which TaskInstances actually have messages to send, it has to call send() on all of them. I took a look at my metrics, and the vast vast vast majority of the time, the TaskInstance.send method is just skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted time.
> The easiest solution here is to remove the trace call when there are no outgoing messages in TaskInstance.send.
> Another solution would be to modify the RunLoop/TaskInstance in such a way that the RunLoop would know which TaskInstances it needs to call send() on, and *only* call send() on those TaskInstances. Presumably, this would have to be done with a callback or something.
> I took a look at the RunLoop, and of the four tight-loop methods (process, send, window, and commit), the only one that iterates over all TaskInstances on every invocation is send(). The rest are either time-bounded (e.g. once ever 60 seconds), or only call methods on a single TaskInstance (process). My inclination is to just remove this log line in TaskInstance.send then, rather than refactoring the code.



--
This message was sent by Atlassian JIRA
(v6.2#6252)