You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Prateek Maheshwari <pm...@linkedin.com> on 2016/10/19 17:50:46 UTC

Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
-----------------------------------------------------------

Review request for samza and Xinyu Liu.


Bugs: SAMZA-1017
    https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description
-------

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by SyncRunLoop, AsyncRunLoop and ThrottlingExecutor
Adds a RunLoop interface, implemented by SyncRunLoop (formerly RunLoop) and AsyncRunLoop. 
When AsyncRunLoop is throttled, it delays the onComplete() callback from processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not throttle the run loop as long as task processing rate > message throughput. E.g., a low QPS stream with process() time < message inter-arrival time. If desirable, this can be addressed by delaying based on the total run loop time instead of just the process() time.

2. If throttled, users can increase their throughput back to original by increasing task.max.concurrency and redeploying their jobs. I don't have a simple solution for this, suggestions are welcome.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java a789d04 
  samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913de 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
-------

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari


Re: Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/#review153300
-----------------------------------------------------------


Ship it!




Ship It!

- Xinyu Liu


On Oct. 19, 2016, 6:30 p.m., Prateek Maheshwari wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53027/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 6:30 p.m.)
> 
> 
> Review request for samza and Xinyu Liu.
> 
> 
> Bugs: SAMZA-1017
>     https://issues.apache.org/jira/browse/SAMZA-1017
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added disk quota based throttling to AsyncRunLoop.
> 
> Overview:
> Adds a Throttleable interface, implemented by RunLoop, AsyncRunLoop and ThrottlingExecutor
> When AsyncRunLoop is throttled, it delays the onComplete() callback from processAsync() by a delay amount appropriate for the desired work factor.
> 
> This implementation has a couple of known issues:
> 1. Adding additional delay to process()/processAsync() callback will not throttle the run loop as long as task processing rate > message throughput. E.g., a low QPS stream with process() time < message inter-arrival time. If desirable, this can be addressed by delaying based on the total run loop time instead of just the process() time.
> 
> 2. If throttled, users can increase their throughput back to original by increasing task.max.concurrency and redeploying their jobs. I don't have a simple solution for this, suggestions are welcome.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java a789d04 
>   samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java 21fbca2 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 77eceea 
>   samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java afcc4c5 
>   samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 3263e54 
>   samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala aa1a8d6 
>   samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala cff6b96 
> 
> Diff: https://reviews.apache.org/r/53027/diff/
> 
> 
> Testing
> -------
> 
> Tested locally with a hello world app.
> 
> 
> Thanks,
> 
> Prateek Maheshwari
> 
>


Re: Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
-----------------------------------------------------------

(Updated Oct. 19, 2016, 11:30 a.m.)


Review request for samza and Xinyu Liu.


Changes
-------

Rebased from HEAD.


Bugs: SAMZA-1017
    https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description
-------

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by RunLoop, AsyncRunLoop and ThrottlingExecutor
When AsyncRunLoop is throttled, it delays the onComplete() callback from processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not throttle the run loop as long as task processing rate > message throughput. E.g., a low QPS stream with process() time < message inter-arrival time. If desirable, this can be addressed by delaying based on the total run loop time instead of just the process() time.

2. If throttled, users can increase their throughput back to original by increasing task.max.concurrency and redeploying their jobs. I don't have a simple solution for this, suggestions are welcome.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java a789d04 
  samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 77eceea 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 3263e54 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
-------

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari


Re: Review Request 53027: SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53027/
-----------------------------------------------------------

(Updated Oct. 19, 2016, 10:54 a.m.)


Review request for samza and Xinyu Liu.


Changes
-------

Updated description.


Bugs: SAMZA-1017
    https://issues.apache.org/jira/browse/SAMZA-1017


Repository: samza


Description (updated)
-------

Added disk quota based throttling to AsyncRunLoop.

Overview:
Adds a Throttleable interface, implemented by RunLoop, AsyncRunLoop and ThrottlingExecutor
When AsyncRunLoop is throttled, it delays the onComplete() callback from processAsync() by a delay amount appropriate for the desired work factor.

This implementation has a couple of known issues:
1. Adding additional delay to process()/processAsync() callback will not throttle the run loop as long as task processing rate > message throughput. E.g., a low QPS stream with process() time < message inter-arrival time. If desirable, this can be addressed by delaying based on the total run loop time instead of just the process() time.

2. If throttled, users can increase their throughput back to original by increasing task.max.concurrency and redeploying their jobs. I don't have a simple solution for this, suggestions are welcome.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java a789d04 
  samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java 21fbca2 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0 
  samza-core/src/main/java/org/apache/samza/util/Throttleable.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java afcc4c5 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913de 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala aa1a8d6 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala cff6b96 

Diff: https://reviews.apache.org/r/53027/diff/


Testing
-------

Tested locally with a hello world app.


Thanks,

Prateek Maheshwari