You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Chris Riccomini <cr...@apache.org> on 2014/04/04 03:04:58 UTC

Review Request 20022: SAMZA-220

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

bump default max msgs per stream partition up to 10k


add an unprocessed message counter


black list empty system stream partitions


Diffs
-----

  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini


Re: Review Request 20022: SAMZA-220

Posted by Jakob Homan <jg...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review40687
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala
<https://reviews.apache.org/r/20022/#comment73784>

    This class effectively wraps the chooser.  Would it be better extending Chooser, ie BufferingMessageChooser?



samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala
<https://reviews.apache.org/r/20022/#comment73787>

    Can we add return types to aid in readability?



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment73779>

    Typo: this -> thus



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment73781>

    This in combination with depletedQueueSizeThreshold is confusing.


- Jakob Homan


On April 17, 2014, 10:30 a.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 17, 2014, 10:30 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> move coordinator to buffer, and add javadocs
> 
> 
> move unprocessed messages into coordinator
> 
> 
> adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class
> 
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Re: Review Request 20022: SAMZA-220

Posted by Jakob Homan <jg...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review41065
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
<https://reviews.apache.org/r/20022/#comment74449>

    Needs header.



samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
<https://reviews.apache.org/r/20022/#comment74450>

    header


- Jakob Homan


On April 21, 2014, 2:08 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 21, 2014, 2:08 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> test for buffering chooser
> 
> 
> picking up where i left off with SAMZA-220
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
>   samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

(Updated April 21, 2014, 9:08 p.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

test for buffering chooser


picking up where i left off with SAMZA-220


Diffs (updated)
-----

  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
  samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

(Updated April 17, 2014, 8:35 p.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

rename message chooser buffer to buffering message chooser


remove depleted queue threshold and fetch threshold pct


add metrics back in


more javadocs


move coordinator to buffer, and add javadocs


move unprocessed messages into coordinator


adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class


don't hard code the refresh threshold


bump default max msgs per stream partition up to 10k


add an unprocessed message counter


black list empty system stream partitions


Diffs (updated)
-----

  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
  samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

(Updated April 17, 2014, 5:30 p.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

move coordinator to buffer, and add javadocs


move unprocessed messages into coordinator


adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class


don't hard code the refresh threshold


bump default max msgs per stream partition up to 10k


add an unprocessed message counter


black list empty system stream partitions


Diffs (updated)
-----

  samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

(Updated April 17, 2014, 12:48 a.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class


don't hard code the refresh threshold


bump default max msgs per stream partition up to 10k


add an unprocessed message counter


black list empty system stream partitions


Diffs (updated)
-----

  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
  samza-core/src/main/scala/org/apache/samza/system/chooser/MessageChooserCoordinator.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.

> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 97
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line97>
> >
> >     I would find the documentation more helpful if it didn't describe what the code does to this variable (I can find that out by reading the code itself), but rather *why* it exists and what problem it solves.

I tried to describe more about "why" in the latest patch.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 166
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line166>
> >
> >     Not clear whether you mean that maxMsgsPerStreamPartition defaults to 1000, or refreshThreshold defaults to 1000.

Updated to be explicit.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 174
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line174>
> >
> >     I think this is a slight change of semantics. The previous computation returned the number of messages that had not yet been given to the chooser, whereas totalUnprocessedMessages is only decremented after the chooser has chosen the message. Don't think that's a problem, just wanted to point it out in case it wasn't deliberate.

Yea, it is. I didn't think it was really a big deal, though.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 300
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line300>
> >
> >     I have difficulty convincing myself that this logic is correct -- is it always the case that the partition is neededByChooser in this case? It's probably correct, but it's very subtle logic that's easy to get wrong. Would it be possible to express this in a way that's easier to reason about?
> >     
> >     For example, mapping each systemStreamPartition to one of enum { IN_CHOOSER, NEEDED_BY_CHOOSER, SKIPPING_CHOOSER } would make clear that each partition is in exactly one of those three states at any one time.

Moved logic into MessageChooserBuffer. Hopefully this makes things a little more clear.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review39588
-----------------------------------------------------------


On April 17, 2014, 5:30 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 17, 2014, 5:30 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> move coordinator to buffer, and add javadocs
> 
> 
> move unprocessed messages into coordinator
> 
> 
> adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class
> 
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.

> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 168
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line168>
> >
> >     At first I didn't understand how this was different from fetchThresholdPct (had to read the code to understand that fetchThresholdPct was per topic-partition and refreshThreshold was for total unprocessed messages). I wonder if this could be made clearer somehow, or even folded into a single parameter.

I think we might be able to completely eliminate fetchThresholdPct and depletedQueueSizeThreshold.

The refresh.maybeCall method is only triggered if buffer.totalUnprocessedMessages <= refreshThreshold. The default refreshThreshold is 1000, which means that a refresh is triggered as soon as there are < 1000 total unprocessed messages. The default fetchThresholdPct is 0, which means depletedQueueSizeThreshold is set to 10000, which means that fetches for any given SSP are ONLY triggered when the SSP's queue is completely empty.

If we eliminate the fetch threshold and depleted queue size, and just do fetches when the queue size for an SSP is < refreshThreshold, I think we should be just as well off. In this scenario, a refresh is triggered when the total unprocessed messages < 1000 (by default). In turn, each individual SSP is added to the fetch request when it's queue size is < 1000. Thus, a worst-case scenario of on SSP in the container will still be fetched every time maybeCall is invoked, and maybeCall will only be invoked when the queue is depleted to <= 10% of capacity. 


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review39588
-----------------------------------------------------------


On April 17, 2014, 5:30 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 17, 2014, 5:30 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> move coordinator to buffer, and add javadocs
> 
> 
> move unprocessed messages into coordinator
> 
> 
> adding a message chooser coordinator to try and extract some of the confusing logic from system consumers into a separate class
> 
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Re: Review Request 20022: SAMZA-220

Posted by Martin Kleppmann <mk...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review39588
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72019>

    Is it worth exposing this property as configurable in the job properties? Since SystemConsumers tries to actually keep this number of messages per partition in memory, a job that uses large messages or many partitions may find itself using a lot of memory. For example, if messages are 100kB each, this default will use up to 1 GB per partition.
    
    Or perhaps we can find some way of auto-tuning this parameter based on the number of partitions and the average message size, so as to use a fixed amount of memory per container for unprocessed messages.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72024>

    I would find the documentation more helpful if it didn't describe what the code does to this variable (I can find that out by reading the code itself), but rather *why* it exists and what problem it solves.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72020>

    Not clear whether you mean that maxMsgsPerStreamPartition defaults to 1000, or refreshThreshold defaults to 1000.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72021>

    At first I didn't understand how this was different from fetchThresholdPct (had to read the code to understand that fetchThresholdPct was per topic-partition and refreshThreshold was for total unprocessed messages). I wonder if this could be made clearer somehow, or even folded into a single parameter.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72022>

    I think this is a slight change of semantics. The previous computation returned the number of messages that had not yet been given to the chooser, whereas totalUnprocessedMessages is only decremented after the chooser has chosen the message. Don't think that's a problem, just wanted to point it out in case it wasn't deliberate.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/20022/#comment72029>

    I have difficulty convincing myself that this logic is correct -- is it always the case that the partition is neededByChooser in this case? It's probably correct, but it's very subtle logic that's easy to get wrong. Would it be possible to express this in a way that's easier to reason about?
    
    For example, mapping each systemStreamPartition to one of enum { IN_CHOOSER, NEEDED_BY_CHOOSER, SKIPPING_CHOOSER } would make clear that each partition is in exactly one of those three states at any one time.


- Martin Kleppmann


On April 4, 2014, 1:09 a.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 4, 2014, 1:09 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Re: Review Request 20022: SAMZA-220

Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/
-----------------------------------------------------------

(Updated April 4, 2014, 1:09 a.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

don't hard code the refresh threshold


bump default max msgs per stream partition up to 10k


add an unprocessed message counter


black list empty system stream partitions


Diffs (updated)
-----

  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala bbbacb59866c2374853052f7cc11826552f5fb01 

Diff: https://reviews.apache.org/r/20022/diff/


Testing
-------


Thanks,

Chris Riccomini