You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/05/04 17:33:53 UTC

[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1599: SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled

lakshmi-manasa-g commented on code in PR #1599:
URL: https://github.com/apache/samza/pull/1599#discussion_r865101103


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -216,9 +216,19 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
+    // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
+    // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
+    // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
+    // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
+    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,

Review Comment:
   Is the question:
   (a) without elasticity, does this PR work with broadcast streams OR
   (b) WITH elasticity, PR works for broadcast?
   
   for (b) 
   broadcast stream expected behavior may not be valid with elasticity enabled. 
   meaning, if broadcast streams was used to ensure that all messages in a broadcast ssp is consumed by all tasks with some expectation that some processing logic in a task relies on all messages from broadcast ssp arriving at it then that could potentially be incorrect. For this reason, let me update the pr desc (and elasticity jira desc) that elasticity does not support broadcast streams.
   
   for (a) without elasticity - yes it works.
   this change is based on the fact that an ssp is present in a task’s model (aka list of ssp consumed) exactly once and that OffsetManager.update is called with taskName and ssp info.
   even with broadcast streams, if two partitions of a broadcast stream are consumed by a job then a task will consume both partitions.
   but each partition of the broadcast stream is a separate ssp in the task’s model (same system stream but different partition).
   
   Let me elaborate with an example to make this clearer.
   
   suppose job has NO elasticity enabled. Say job has one input stream with two partitions i0, i1 and two broadcast partitions b0, b1. Note that i0, i1, b0, b1 are all SSP - have info about system, stream and partition.
   Lets say using GroupByPartition SSP grouper. Job model will look like Task0 consumes i0, b0 and b1. Task1 consumes i1, b0, b1.
   
   Prior to this change:
   update(task, ssp, offset) did
   lastProcessedOffsets.get(taskName).put(ssp, offset)
   
   Now:
   update(task, ssp, offset) does
   fetched-ssp = ssp-consumed-by-taskName.get(input-ssp such that input-ssp.system, stream, partition match the given ssp)
   lastProcessedOffsets.get(taskName).put(fetched-ssp, offset)
   
   so if update(Task0, i0, 1) comes in, new code will find in the list <i0, b0, b1> the ssp that has matching system, stream and partition and finds i0.
   similarly for update(Task0, b0, 2) for broadcast input and so on.
   
   Now,  suppose this job enabled elasticity with elasticity.factor = 2 and with the same grouper
   Jobmodel will look like Task0_0_2 consumes <(i0,0), (b0,0), (b1,0)> // where (i0,0) means keyBucket 0 of i0 ssp.
   Task0_1_2 consumes <(i0,1), (b0,1), (b1,1)> // where (i0,1) means keyBucket 1 of i0 ssp.
   
   Now, when Task0_0_2 finishes consuming say offset 4 of i0 (broadcast input) then update(Task0_0_2, i0, 4) is called.
   when looking for i0 in Task0_0_2 list of ssp, (i0,0) is found and added to last proc offsets.
   
   This would be similar for the other grouper - GroupBySystemStreamPartition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org