You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/04/16 03:20:24 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request, #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

lakshmi-manasa-g opened a new pull request, #1599:
URL: https://github.com/apache/samza/pull/1599

   Feature: Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
   This current PR is to update the last processed offsets maintained by the OffsetManager during processing stage of the container correctly when elasticity is enabled.
   
   Changes:
   1. Modify OffsetManager.udpate to correctly identify the ssp even with keyBucket info is not present in the given parameter ssp
   2. Modify SystemConsumers to provide the ssp,offset to Chooser after all the registrations are done. This is because, chooser is given an ssp without keyBucket but a container could be processing multiple keyBuckets within the same ssp. So after registration for all is complete, the smallest offset is given to the chooser.
   
   Tests: TBD
   
   API changes: no public api change 
   
   Upgrade instructions: none
   
   Usage instructions: None.
   
   Backwards compatible: yes. does not affect the existing flow 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on PR #1599:
URL: https://github.com/apache/samza/pull/1599#issuecomment-1127980245

   IIUC, we want to add some validation to fail if 
   elasticity-factor > 1 and broadcast-streams is enabled?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1599:
URL: https://github.com/apache/samza/pull/1599#discussion_r864339360


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -216,9 +216,19 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
+    // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
+    // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
+    // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
+    // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
+    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,

Review Comment:
   Will this work with Broadcast streams?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rayman7718 merged pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rayman7718 merged PR #1599:
URL: https://github.com/apache/samza/pull/1599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] xiefan46 commented on pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on PR #1599:
URL: https://github.com/apache/samza/pull/1599#issuecomment-1107949564

   lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1599:
URL: https://github.com/apache/samza/pull/1599#discussion_r865101103


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -216,9 +216,19 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
+    // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
+    // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
+    // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
+    // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
+    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,

Review Comment:
   Is the question:
   (a) without elasticity, does this PR work with broadcast streams OR
   (b) WITH elasticity, PR works for broadcast?
   
   for (b) 
   broadcast stream expected behavior may not be valid with elasticity enabled. 
   meaning, if broadcast streams was used to ensure that all messages in a broadcast ssp is consumed by all tasks with some expectation that some processing logic in a task relies on all messages from broadcast ssp arriving at it then that could potentially be incorrect. For this reason, let me update the pr desc (and elasticity jira desc) that elasticity does not support broadcast streams.
   
   for (a) without elasticity - yes it works.
   this change is based on the fact that an ssp is present in a task’s model (aka list of ssp consumed) exactly once and that OffsetManager.update is called with taskName and ssp info.
   even with broadcast streams, if two partitions of a broadcast stream are consumed by a job then a task will consume both partitions.
   but each partition of the broadcast stream is a separate ssp in the task’s model (same system stream but different partition).
   
   Let me elaborate with an example to make this clearer.
   
   suppose job has NO elasticity enabled. Say job has one input stream with two partitions i0, i1 and two broadcast partitions b0, b1. Note that i0, i1, b0, b1 are all SSP - have info about system, stream and partition.
   Lets say using GroupByPartition SSP grouper. Job model will look like Task0 consumes i0, b0 and b1. Task1 consumes i1, b0, b1.
   
   Prior to this change:
   update(task, ssp, offset) did
   lastProcessedOffsets.get(taskName).put(ssp, offset)
   
   Now:
   update(task, ssp, offset) does
   fetched-ssp = ssp-consumed-by-taskName.get(input-ssp such that input-ssp.system, stream, partition match the given ssp)
   lastProcessedOffsets.get(taskName).put(fetched-ssp, offset)
   
   so if update(Task0, i0, 1) comes in, new code will find in the list <i0, b0, b1> the ssp that has matching system, stream and partition and finds i0.
   similarly for update(Task0, b0, 2) for broadcast input and so on.
   
   Now,  suppose this job enabled elasticity with elasticity.factor = 2 and with the same grouper
   Jobmodel will look like Task0_0_2 consumes <(i0,0), (b0,0), (b1,0)> // where (i0,0) means keyBucket 0 of i0 ssp.
   Task0_1_2 consumes <(i0,1), (b0,1), (b1,1)> // where (i0,1) means keyBucket 1 of i0 ssp.
   
   Now, when Task0_0_2 finishes consuming say offset 4 of i0 (broadcast input) then update(Task0_0_2, i0, 4) is called.
   when looking for i0 in Task0_0_2 list of ssp, (i0,0) is found and added to last proc offsets.
   
   This would be similar for the other grouper - GroupBySystemStreamPartition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org