You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/01/20 23:48:45 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

lakshmi-manasa-g opened a new pull request #1576:
URL: https://github.com/apache/samza/pull/1576


   **Feature:** Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
   - This is achieved by having elastic tasks which is the same as a task for all practical purposes. But an elastic task consumes only a subset of the messages of an SSP. 
   - With an elasticity factor F (integer), the number of elastic tasks will be F times N with N = original task count. 
   - The F elastic tasks per original task all consume subsets of same SSP as the original task. There will be F subsets (aka key bucket) per SSP and a message falls into an SSP bucket 'i' if its message.key.hash()%F == i. 
    
   **Changes:**
   1. introduce the config for enabling elasticity as job.elasticity.factor. If the job without elasticity has N tasks then with factor = F > 1, there will be F times N (elastic) tasks
   2. Add "key bucket" (an integer ranging 0-F) to SSP which will identify the messages within the SSP
   3. Compute the key bucket the IncomingMessageEnvelope falls into given elasticity factor F. 
   4. SamzaObjectMapper changes to serde keyBucket component of SSP. 
    
   
**Tests:** updated unit tests and added new ones.
   

   
**API Changes:** no public API changes
    
   **Upgrade Instructions:** N/A
    
   **Usage Instructions:** set the config job.elasticity.factor > 1 to enable elasticity for the job. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r797231333



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       But cant you still have the !=0 check in toString to preserve existing behavior?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r798913861



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       yes you are right.currently my indexes are 0-based and hence using -1 as the default when elasticity is off.
   
   if i choose keyBucket=0 for elasticity off, then i will have to start my elasticity enabled indexes from 1,2.. but this does not differ from default when elasticity off = -1 and start indexes from 0,1,2.. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r798890172



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       How exactly is this doable? is your bucket based on 1-index is it? when you have elasticity enabled, one of the bucket will have 0 as the index and hence its representation would be different from others.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r795871201



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       +1, keyBucket can just be 0.

##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       +1, keyBucket can just be 0 by default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r797181750



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       during the second pr https://github.com/apache/samza/pull/1580 dev, i realized that there is a bigger problem with having keyBucket=0 in the ssp.tostring when elasticity is not enabled.
   
   GroupBySystemStreamPartition uses ssp.tostring as the taksName and task level metrics (like sends etc) use the task name. keeping keyBucket=0 in all ssp.tostring even when elasticity is not enabled will result in loss of metrics history. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r798882177



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       yes, that is doable but imo doesnt differ much from the current !=-1 check. pl lmk if you feel strongly about this and i can update. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r789239499



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       why not default the keyBucket to just 0 or 1 however you are basing the bucket index? Cut shorts all the additional handling of -1 and so-on and also logically makes sense that in the event of keyBucket < 1 there is one bucket to which all the of the messages belonging to the task is assigned to.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1576:
URL: https://github.com/apache/samza/pull/1576


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r799669059



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -74,6 +94,7 @@ private int computeHashCode() {
     final int prime = 31;
     int result = super.hashCode();
     result = prime * result + ((partition == null) ? 0 : partition.hashCode());
+    result = prime * result + ((keyBucket == -1) ? 0 : keyBucket);

Review comment:
       I just realized this hashing function yields the same hash regardless of elasticity enabled or not for bucket 0 vs -1 respectively.
   
   I feel you could treat 0 as default and modify the `toString()` to include `keyBucket` regardless and just regenerate dashboard or make this is API break change in metrics name explicitly as we are anyways going to run into it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r796171042



##########
File path: samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
##########
@@ -38,6 +39,7 @@ public SystemStreamPartition(String system, String stream, Partition partition)
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;

Review comment:
       agreed that keeping keyBucket = 0 when there is no elasticity makes this particular SSP class simpler.
   
   my reason for having it to -1 was as follows.
   making keyBucket = 0 and removing the check `keyBucket == -1` means every time SSP is printed into logs and task model, it will be `SystemStreamPartition [system, stream, partition, 0] with no elasticity enabled which can be confusing.
   
   this check of `keyBucket-1` will be restricted to this class and hence i think it is better to have this additional check rather than having to introduce SSP containing 0 in all places. 
   
   pl lmk if this makes sense

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -466,4 +472,12 @@ public boolean getStartpointEnabled() {
   public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
+
+  public boolean getElasticityEnabled() {

Review comment:
       is the question why `getElasticityEnabled` is needed when we have `getElasticityFactor`?
   callers wanting to know if elasticity is enabled need not and should not be required to know that elasticity.factor = 1 means disabled and >1 means enabled. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1576: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1576:
URL: https://github.com/apache/samza/pull/1576#discussion_r795873202



##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -466,4 +472,12 @@ public boolean getStartpointEnabled() {
   public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
+
+  public boolean getElasticityEnabled() {

Review comment:
       not sure why this is needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org