You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/02/01 18:16:16 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

lakshmi-manasa-g opened a new pull request #1580:
URL: https://github.com/apache/samza/pull/1580


   **Feature:** Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
   This is achieved by having elastic tasks which is the same as a task for all practical purposes. But an elastic task consumes only a subset of the messages of an SSP.
   With an elasticity factor F (integer), the number of elastic tasks will be F times N with N = original task count.
   The F elastic tasks per original task all consume subsets of same SSP as the original task. There will be F subsets (aka key bucket) per SSP and a message falls into an SSP bucket ā€˜iā€™ if its message.key.hash()%F == i.
   
   **Previous PR** = https://github.com/apache/samza/pull/1576 (but that commit is part of this pr too.. will get removed when first pr merged and this pr branch is rebased)
   
   **Changes:**
   
   1. update SSP groupers GroupByPartition and GroupBySystemStreamPartition to generate F X N (elastic) tasks where F = elasticity factor and N = original (without elasticity) task count
   2. update SamzaContainer and RunLoopFactory to pass the elasticity factor to RunLoop and TaskInstance
   3. Update RunLoop to use elasticity factor based keyBucket of the input SSP of the incoming envelope to find the (elastic) task for the envelope.
   4. Update TaskInstance to use SSP with KeyBucket for its book keeping of SSPs caught up and so on.
   5. Update SystemConsumers to shield all the underlying SystemConsumer of the input systems from the KeyBucket concept. aka SystemConsumers (ConsumerMultiplexer) takes the SSP with KeyBucket and passes down only the SSP (without keyBucket) to the system consumer.
   
   Currently, not supported when elasticity is enabled ->
   
   1. checkpoints and prev offsets (aka elasticity currently only works with reset offset), broadcast stream, ssp groupers other than GroupBySSP and GroupByPartition.
   2. Metrics - task and ssp level metrics. container might not report correct values (for ex, chose_object will account for all messages in the SSP but should correspond to only the messages with the SSP KeyBucket processed by the container)
   Additional PRs will be raised for this.
   
   **Tests:** <TBD> adding new tests
   
   **API Changes:** no public API changes
   **Upgrade Instructions:** N/A
   **Usage Instructions:** set the config job.elasticity.factor > 1 to enable elasticity for the job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809601800



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          TaskName taskName = new TaskName(String.format("Partition %d %d", ssp.getPartition().getPartitionId(), i));
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
+          addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+        }
+      } else {
+        TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
+        addToTaskNameSSPMap(groupedMap, taskName, ssp);

Review comment:
       Also, can be simplified into 
   
   ``for (int i = 0; i < elasticityFactor; i++) {
   String taskNameStr = elasticityFactor == 1 ? "Partition %d" : "Partition %d %d";
   TaskName taskName = new TaskName(taskNameStr);
   int keyBucket = elasciticyFactor == 1 ? -1 : i;
   SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
   groupedMap.putIfAbsent(taskName, new HashSet<>());
   groupedMap.get(taskName).add(ssp);
   }``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809596622



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {

Review comment:
       nit: for ( int keyBucket = 0; keyBucket < elasticityFactor; ....




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797235895



##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -624,7 +625,7 @@ object SamzaContainer extends Logging {
       taskThreadPool,
       maxThrottlingDelayMs,
       samzaContainerMetrics,
-      taskConfig,
+      config,

Review comment:
       see comment above 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797233568



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -239,8 +258,29 @@ private void runTasks(IncomingMessageEnvelope envelope) {
     if (!shutdownNow) {
       if (envelope != null) {
         PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-        for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
-          worker.state.insertEnvelope(pendingEnvelope);
+        SystemStreamPartition sspOfEnvelope = null;
+        // when elasticity is enabled
+        // the tasks actually consume a keyBucket of the ssp.
+        // hence use the SSP with keybucket to find the worker(s) for the envelope
+        if (elasticityFactor <= 1) {
+          sspOfEnvelope = envelope.getSystemStreamPartition();
+        } else {
+          sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
+          log.trace("elasticity enabled. using the ssp of the envelope as {}", sspOfEnvelope);
+        }

Review comment:
       Cant this be one call, so that 
   envelope.getSSP(elasticity) returns the current enevelope.getSSP() with elasticity = 1, 
   that way method, 
   envelope.getSSP() can be deleted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809595687



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -250,6 +285,40 @@ private void runTasks(IncomingMessageEnvelope envelope) {
     }
   }
 
+  /**
+   * when elasticity is not enabled, fetch the workers from sspToTaskWorkerMapping using envelope.getSSP()
+   * when elasticity is enabled,
+   *       sspToTaskWorkerMapping has workers for a SSP which has keyBucket
+   *       hence need to use envelop.getSSP(elasticityFactor)
+   *       Additionally, when envelope is EnofStream, it needs to be sent to all works for the ssp irrespective of keyBucket

Review comment:
       Could we list out all the message-types that map to more than one workes 
   e.g., EOS, watermark, etc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797236185



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -172,7 +172,10 @@ class SystemConsumers (
   def start {
     for ((systemStreamPartition, offset) <- sspToRegisteredOffsets.asScala) {
       val consumer = consumers(systemStreamPartition.getSystem)
-      consumer.register(systemStreamPartition, offset)
+      // If elasticity is enabled then the RunLoop gives SSP with keybucket
+      // but the actual systemConsumer which consumes from the input does not know about KeyBucket.
+      // hence, use an SSP without KeyBucket
+      consumer.register(new SystemStreamPartition(systemStreamPartition.getSystemStream, systemStreamPartition.getPartition), offset)

Review comment:
       Is there another way we can do this, adding this discrepancy adds additional confusion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xiefan46 commented on pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on pull request #1580:
URL: https://github.com/apache/samza/pull/1580#issuecomment-1028263098


   lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797234179



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -239,8 +258,29 @@ private void runTasks(IncomingMessageEnvelope envelope) {
     if (!shutdownNow) {
       if (envelope != null) {
         PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-        for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
-          worker.state.insertEnvelope(pendingEnvelope);
+        SystemStreamPartition sspOfEnvelope = null;
+        // when elasticity is enabled
+        // the tasks actually consume a keyBucket of the ssp.
+        // hence use the SSP with keybucket to find the worker(s) for the envelope
+        if (elasticityFactor <= 1) {
+          sspOfEnvelope = envelope.getSystemStreamPartition();
+        } else {
+          sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
+          log.trace("elasticity enabled. using the ssp of the envelope as {}", sspOfEnvelope);
+        }
+        List<AsyncTaskWorker> listOfWorkersForEnvelope = sspToTaskWorkerMapping.get(sspOfEnvelope);
+        if (listOfWorkersForEnvelope != null) {
+          for (AsyncTaskWorker worker : listOfWorkersForEnvelope) {
+            worker.state.insertEnvelope(pendingEnvelope);
+          }
+        } else if (elasticityFactor > 1) {
+          // when elasticity is enabled

Review comment:
       Add comment 
   // is listOfWorkersForEnvelope is null and elascity factor > 1, then 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797235793



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -69,4 +82,12 @@ public GroupByPartition(Config config) {
 
     return groupedMap;
   }
+
+  private void addToTaskNameSSPMap(Map<TaskName, Set<SystemStreamPartition>> groupedMap, TaskName taskName,

Review comment:
       static?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809602335



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
##########
@@ -50,9 +53,20 @@ public GroupBySystemStreamPartition(Config config) {
         continue;
       }
 
-      HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
-      sspSet.add(ssp);
-      groupedMap.put(new TaskName(ssp.toString()), sspSet);
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each ssp create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of SSP
+      // each task will have name SSP[system,stream,partition,keyBucket] keyBucket <= elasticityFactor
+      // each task portion correspdnding to keyBucket of the SSP.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);

Review comment:
       same as above, can be simplified, 
   then dont need the 2 line method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809595188



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -239,8 +260,22 @@ private void runTasks(IncomingMessageEnvelope envelope) {
     if (!shutdownNow) {
       if (envelope != null) {
         PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-        for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
-          worker.state.insertEnvelope(pendingEnvelope);
+        // when elasticity is enabled
+        // the tasks actually consume a keyBucket of the ssp.
+        // hence use the SSP with keybucket to find the worker(s) for the envelope

Review comment:
       Could we add a comment to indicate why a SSP+keyBucket can map to more than one workers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809596256



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -69,4 +82,12 @@ public GroupByPartition(Config config) {
 
     return groupedMap;
   }
+
+  private static void addToTaskNameSSPMap(Map<TaskName, Set<SystemStreamPartition>> groupedMap, TaskName taskName,
+      SystemStreamPartition ssp) {
+    if (!groupedMap.containsKey(taskName)) {
+      groupedMap.put(taskName, new HashSet<>());
+    }

Review comment:
       putIfAbsent?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r799921230



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
##########
@@ -187,6 +187,77 @@ public void testDeserializeContainerModelIdFieldOnly() throws IOException {
     deserializeFromObjectNode(jobModelJson);
   }
 
+  @Test
+  public void testSerializeSystemStreamPartition() throws IOException {
+    // case 1: keyBucket not explicitly mentioned
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    ObjectNode expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+
+    //Case 2: with non-null keyBucket
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+    assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket"));
+  }
+
+  @Test
+  public void testDeserializeSystemStreamPartition() throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // case 1: keyBucket not explicitly mentioned
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+
+    // case 2: explicitly set key bucket
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+  }
+

Review comment:
       added two scenarios -> one where pre elastic aka old-samza-object-mapper serializes and new one deserializes and another scenario where new one serializes and old one deserializes. pl lmk if this makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797236508



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
##########
@@ -187,6 +187,77 @@ public void testDeserializeContainerModelIdFieldOnly() throws IOException {
     deserializeFromObjectNode(jobModelJson);
   }
 
+  @Test
+  public void testSerializeSystemStreamPartition() throws IOException {
+    // case 1: keyBucket not explicitly mentioned
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    ObjectNode expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+
+    //Case 2: with non-null keyBucket
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+    assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket"));
+  }
+
+  @Test
+  public void testDeserializeSystemStreamPartition() throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // case 1: keyBucket not explicitly mentioned
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+
+    // case 2: explicitly set key bucket
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+  }
+

Review comment:
       Is it possible to add a test where samza-object-mapper tries to deserialize values written by the "old-samza-object-mapper" that doesnt have your change. 
   We want to test that jobs will be able to roll forward and backward across samza versions seamlessly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797232872



##########
File path: samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
##########
@@ -111,6 +111,13 @@ public SystemStreamPartition getSystemStreamPartition() {
     return systemStreamPartition;
   }
 
+  // used for elasticity to determine which elastic task should handle this envelope
+  public SystemStreamPartition getSystemStreamPartition(int elasticityFactor) {
+    Object envelopeKeyorOffset = key != null ? key : offset;
+    int keyBucket = Math.abs(envelopeKeyorOffset.hashCode() % elasticityFactor);

Review comment:
       10 % 3 is 1 
   -10 % 3 is 2, 
   so need to instead use Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797235793



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -69,4 +82,12 @@ public GroupByPartition(Config config) {
 
     return groupedMap;
   }
+
+  private void addToTaskNameSSPMap(Map<TaskName, Set<SystemStreamPartition>> groupedMap, TaskName taskName,

Review comment:
       static helper method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809602335



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
##########
@@ -50,9 +53,20 @@ public GroupBySystemStreamPartition(Config config) {
         continue;
       }
 
-      HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
-      sspSet.add(ssp);
-      groupedMap.put(new TaskName(ssp.toString()), sspSet);
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each ssp create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of SSP
+      // each task will have name SSP[system,stream,partition,keyBucket] keyBucket <= elasticityFactor
+      // each task portion correspdnding to keyBucket of the SSP.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);

Review comment:
       same as aboev




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797235211



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
##########
@@ -39,9 +41,10 @@ public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, Ru
       ExecutorService threadPool,
       long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics,
-      TaskConfig taskConfig,
+      Config config,

Review comment:
       Preferable to pass elastcitrity factor as the additional variable in addition to task-config, rather than the entire config object




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809604368



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
##########
@@ -254,6 +257,29 @@ void registerInputStream(SystemStream input) {
     return Collections.emptyList();
   }
 
+  /**
+   * returns true if current task should broadcast control message (end of stream/watermark) to others
+   * if elasticity is not enabled (elasticity factor <=1 ) then the current task is eligible
+   * if elastiicty is enabled, pick the elastic task consuming keybucket = 0 of the ssp as the eligible task
+   * @param ssp ssp that the current task consumes
+   * @return true if current task is eligible to broadcast control messages
+   */
+  private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition ssp) {
+    if (elasticityFactor <= 1) {
+      return true;
+    }
+
+    // if elasticity is enabled then taskModel actually has ssp with keybuckets in it
+    // check if this current elastic task processes the first keybucket (=0) of the ssp given
+    return
+        taskModel.getSystemStreamPartitions().stream()
+            .filter(sspInModel ->
+                ssp.getSystemStream().equals(sspInModel.getSystemStream()) // ensure same systemstream as ssp given
+                && ssp.getPartition().equals(sspInModel.getPartition()) // ensure same partition as ssp given
+                && sspInModel.getKeyBucket() == 0) // ensure sspInModel has keyBucket 0
+            .count() > 0; // >0 means current task consumes the keyBucket = 0 of the ssp given

Review comment:
       Why not just 
   return ssp.getKeyBucket() == 0 
   ?
   
   Also then the elasticityFactor <= 1 is not needed because it will be -1?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809604368



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
##########
@@ -254,6 +257,29 @@ void registerInputStream(SystemStream input) {
     return Collections.emptyList();
   }
 
+  /**
+   * returns true if current task should broadcast control message (end of stream/watermark) to others
+   * if elasticity is not enabled (elasticity factor <=1 ) then the current task is eligible
+   * if elastiicty is enabled, pick the elastic task consuming keybucket = 0 of the ssp as the eligible task
+   * @param ssp ssp that the current task consumes
+   * @return true if current task is eligible to broadcast control messages
+   */
+  private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition ssp) {
+    if (elasticityFactor <= 1) {
+      return true;
+    }
+
+    // if elasticity is enabled then taskModel actually has ssp with keybuckets in it
+    // check if this current elastic task processes the first keybucket (=0) of the ssp given
+    return
+        taskModel.getSystemStreamPartitions().stream()
+            .filter(sspInModel ->
+                ssp.getSystemStream().equals(sspInModel.getSystemStream()) // ensure same systemstream as ssp given
+                && ssp.getPartition().equals(sspInModel.getPartition()) // ensure same partition as ssp given
+                && sspInModel.getKeyBucket() == 0) // ensure sspInModel has keyBucket 0
+            .count() > 0; // >0 means current task consumes the keyBucket = 0 of the ssp given

Review comment:
       Why not just 
   return ssp.getKeyBucket() == 0 
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809601800



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          TaskName taskName = new TaskName(String.format("Partition %d %d", ssp.getPartition().getPartitionId(), i));
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
+          addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+        }
+      } else {
+        TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
+        addToTaskNameSSPMap(groupedMap, taskName, ssp);

Review comment:
       Also, can be simplified into 
   
   `for (int i = 0; i < elasticityFactor; i++) {
   String taskNameStr = elasticityFactor == 1 ? "Partition %d" : "Partition %d %d";
   TaskName taskName = new TaskName(taskNameStr);
   int keyBucket = elasciticyFactor == 1 ? -1 : i;
   SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
   groupedMap.putIfAbsent(taskName, new HashSet<>());
   groupedMap.get(taskName).add(ssp);
   }`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809605434



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -401,6 +409,10 @@ class SystemConsumers (
 
     updated
   }
+
+  private def getSSPWithoutKeyBucket(sspWithKeyBucket: SystemStreamPartition): SystemStreamPartition = {

Review comment:
       Maybe better method name is 
   removeKeyBucket ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809601800



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          TaskName taskName = new TaskName(String.format("Partition %d %d", ssp.getPartition().getPartitionId(), i));
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
+          addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+        }
+      } else {
+        TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
+        addToTaskNameSSPMap(groupedMap, taskName, ssp);

Review comment:
       Also, can be simplified into 
   
   for (int i = 0; i < elasticityFactor; i++) {
   String taskNameStr = elasticityFactor == 1 ? "Partition %d" : "Partition %d %d";
   TaskName taskName = new TaskName(taskNameStr);
   int keyBucket = elasciticyFactor == 1 ? -1 : i;
   SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
   groupedMap.putIfAbsent(taskName, new HashSet<>());
   groupedMap.get(taskName).add(ssp);
   }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rayman7718 commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809596622



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {

Review comment:
       nit: int keyBucket = 0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm merged pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
prateekm merged pull request #1580:
URL: https://github.com/apache/samza/pull/1580


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r810299062



##########
File path: samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
         continue;
       }
 
-      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
-
-      if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<>());
+      // if elasticity factor > 1 then elasticity is enabled
+      // for each partition create ElasticityFactor number of tasks
+      // i.e; result will have number of tasks =  ElasticityFactor X number of partitions
+      // each task will have name Partition_X_Y where X is the partition number and Y <= elasticityFactor
+      // each task Partition_X_Y consumes the keyBucket Y of all the SSP with partition number X.
+      // #todo: add the elasticity ticket for more details?
+      if (elasticityFactor > 1) {
+        for (int i = 0; i < elasticityFactor; i++) {
+          TaskName taskName = new TaskName(String.format("Partition %d %d", ssp.getPartition().getPartitionId(), i));
+          SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
+          addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+        }
+      } else {
+        TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
+        addToTaskNameSSPMap(groupedMap, taskName, ssp);

Review comment:
       this needs elasticityfactor to be >= 1 . updated the JobConfig to ensure this.

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
##########
@@ -254,6 +257,29 @@ void registerInputStream(SystemStream input) {
     return Collections.emptyList();
   }
 
+  /**
+   * returns true if current task should broadcast control message (end of stream/watermark) to others
+   * if elasticity is not enabled (elasticity factor <=1 ) then the current task is eligible
+   * if elastiicty is enabled, pick the elastic task consuming keybucket = 0 of the ssp as the eligible task
+   * @param ssp ssp that the current task consumes
+   * @return true if current task is eligible to broadcast control messages
+   */
+  private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition ssp) {
+    if (elasticityFactor <= 1) {
+      return true;
+    }
+
+    // if elasticity is enabled then taskModel actually has ssp with keybuckets in it
+    // check if this current elastic task processes the first keybucket (=0) of the ssp given
+    return
+        taskModel.getSystemStreamPartitions().stream()
+            .filter(sspInModel ->
+                ssp.getSystemStream().equals(sspInModel.getSystemStream()) // ensure same systemstream as ssp given
+                && ssp.getPartition().equals(sspInModel.getPartition()) // ensure same partition as ssp given
+                && sspInModel.getKeyBucket() == 0) // ensure sspInModel has keyBucket 0
+            .count() > 0; // >0 means current task consumes the keyBucket = 0 of the ssp given

Review comment:
       No. because a task could be consuming other ssp with keybucket=0. we need to ensure it is consuming keybucket=0 for the particular ssp it consumes AND for which it was aggregating eos/watermark.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r799921343



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -172,7 +172,10 @@ class SystemConsumers (
   def start {
     for ((systemStreamPartition, offset) <- sspToRegisteredOffsets.asScala) {
       val consumer = consumers(systemStreamPartition.getSystem)
-      consumer.register(systemStreamPartition, offset)
+      // If elasticity is enabled then the RunLoop gives SSP with keybucket
+      // but the actual systemConsumer which consumes from the input does not know about KeyBucket.
+      // hence, use an SSP without KeyBucket
+      consumer.register(new SystemStreamPartition(systemStreamPartition.getSystemStream, systemStreamPartition.getPartition), offset)

Review comment:
       working on this. trying to find a way that will work well for the next planned pr of existing container metrics reporting .
   will update soon.

##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -239,8 +258,29 @@ private void runTasks(IncomingMessageEnvelope envelope) {
     if (!shutdownNow) {
       if (envelope != null) {
         PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-        for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
-          worker.state.insertEnvelope(pendingEnvelope);
+        SystemStreamPartition sspOfEnvelope = null;
+        // when elasticity is enabled
+        // the tasks actually consume a keyBucket of the ssp.
+        // hence use the SSP with keybucket to find the worker(s) for the envelope
+        if (elasticityFactor <= 1) {
+          sspOfEnvelope = envelope.getSystemStreamPartition();
+        } else {
+          sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
+          log.trace("elasticity enabled. using the ssp of the envelope as {}", sspOfEnvelope);
+        }

Review comment:
       done. 
   But can not delete the envelope.getSSP() method as it is used by many samza components which have no idea or access to elasticityFactor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org