You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/04/16 00:41:34 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request, #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

lakshmi-manasa-g opened a new pull request, #1598:
URL: https://github.com/apache/samza/pull/1598

   Feature: Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
   This current PR is to compute the last processed offsets when container starts up using checkpoints from previous deploys. The current deploy or the previous deploys may have elasticity factor > 1
   
   Changes:
   1. Introduce ElasticityUtils which contains computeLastProcessedOffsetsFromCheckpointMap that computes a task’s last processed offsets using all the checkpoints present in the checkpoint stream for all tasks that were ever part of the job model.
   2. Update OffsetManager.loadOffsetsFromCheckpointManager to compute checkpoint using the ElasticityUtils if either the config “job.elasticity.checkpoints.enabled” or if checkpoint stream had checkpoints with elastic task names
   2. Introduces config “job.elasticity.checkpoints.enabled” config which is disabled by default and should be enabled when rolling back to disable elasticity or going back to elasticity factor = 1
   
   Tests:
   1. added tests for ElasticityUtils (yet to parametrize this test class)
   2. pending: to add an unit test for OffsetManager
   
   API changes: 
     no public api change. new config introduced “job.elasticity.checkpoints.enabled” (default false) which if true will check for previous deploys’ checkpoints
   
   Upgrade instructions: none
   
   Usage instructions: set “job.elasticity.checkpoints.enabled” to true when rolling back to disable elasticity.
   
   Backwards compatible: yes. does not affect the existing checkpoint computation as “job.elasticity.checkpoints.enabled” = false by default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864341106


##########
samza-core/src/main/java/org/apache/samza/config/JobConfig.java:
##########
@@ -479,6 +484,10 @@ public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);

Review Comment:
   Can we just use getElasticityEnabled() inplace of this?
   That is, whenever elasticity factor = 1, we default to false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865475398


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -461,12 +470,19 @@ class OffsetManager(
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-      Map(taskName -> Map())
+    if (!elasticityCheckpointsEnabled) {

Review Comment:
   no but updated the flow to not use the new config but rather rely on existence of elastic task names in the checkpoint stream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864342393


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;

Review Comment:
   org.apache.samza.elasticity.util;



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {

Review Comment:
   public class Utils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864343288


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {

Review Comment:
   private?



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.debug("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {

Review Comment:
   private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rayman7718 merged pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rayman7718 merged PR #1598:
URL: https://github.com/apache/samza/pull/1598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864343575


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -461,12 +470,19 @@ class OffsetManager(
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-      Map(taskName -> Map())
+    if (!elasticityCheckpointsEnabled) {

Review Comment:
   Could just be elasticityEnabled ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] xiefan46 commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r858045928


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);

Review Comment:
   Looks like the patterns can be determined at the beginning. Maybe create those patterns as final static variables when this class is created to improve the performance? 



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);

Review Comment:
   Same as the above comment, we can make those patterns as class members so that we only need to create them once



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,

Review Comment:
   Please correct me if I am wrong: In here, is it safe to use max value as the new offset for several different partitions? Let's say we have two virtual tasks that are consuming Partition 0_0_2 and Partition 0_1_2, and we want to combine the checkpoint offset of these two partitions into one, shouldn't we use the min value instead of the max value? 



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Given a set of checkpoints, find the min aka smallest offset for an ssp
+   * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
+   * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
+   * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
+   *
+   * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
+   * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
+   *
+   * With descendants, a little care is needed. there could be descendants with different elasticity factors.
+   * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
+   * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
+   * Across elasticity factors, largest works just like in ancestor
+   *
+   * Taking a concrete example
+   * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
+   *    Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *    Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *    Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *    From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *    we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *    Thus,
+   *       SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
+   *       SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *    If the checkpoint map has
+   *      Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
+   *      looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
+   *      to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
+   *      picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
+   *      hence we need to take min of offsets within an elasticity factor.
+   *
+   * Given checkpoints for all the tasks in the checkpoint stream,
+   * computing the last proc offset for an ssp checkpoint for a task,
+   * the following needs to be met.
+   *    1. Ancestors: we need to take largest offset among ancestors for an ssp
+   *    2. Descendants:
+   *         a. group descendants by their elasticityFactor.
+   *         b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
+   *         c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
+   *    3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
+   *
+   * @param taskName
+   * @param taskSSPSet
+   * @param checkpointMap
+   * @param systemAdmins
+   * @return
+   */
+  public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPSet,
+      Map<TaskName, Checkpoint> checkpointMap,
+      SystemAdmins systemAdmins) {
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
+        getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
+    Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
+
+    Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
+
+    taskSSPSet.forEach(ssp_withKeyBucket -> {
+      log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
+
+      SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
+          ssp_withKeyBucket.getPartition());
+
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+      String currentLastOffsetForSSP = null;
+
+      String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);

Review Comment:
   qq: what's the use case for combining several ancestors? If we increase the elasticity factor, looks like we only need to assign the offset of the current ancestor to its descendants. And if we decrease the elasticity factor, we only need to combine the offsets of several descendants using the min function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] xiefan46 commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r861203346


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,

Review Comment:
   I see! Thanks for answering my question! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865109887


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {

Review Comment:
   yes it can be done. but code wise will look similar and additionally caller has to have knowledge of -1 meaning not elastic task.
   that said, i am looking to get rid of this method. the latest commit to this pr actually doesnt use this method.
   



##########
samza-core/src/main/java/org/apache/samza/config/JobConfig.java:
##########
@@ -479,6 +484,10 @@ public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);

Review Comment:
   so the reason for introducing this new config is the following scenario
   
   1. First, job has no elasticity aka factor = 1 by default and checkpoints are written with taskName like "Partition 0"
   2. Next, job enabled elasticity aka factor > 1 and checkpoints are written with taskName like "Parittion 0_0_2" and not taskname 0
   3. Then after a while, job disables elasticity aka factor =1 again then we want to use checkpoints 0_0_2 to compute checkpoint of task with name "Partition 0". 
   
   This config when true lets us know that we need to looking for checkpoints of other tasks even if the taksName does not match exactly. as in look for 0_0_2 and 0_1_2 to compute checkpoint of task 0.
   
   without this config, we would have to infer that there was elasticity enabled in the past by looking at all checkpoints present in the stream. And this flow will apply to jobs even when elasticity was never enabled for the job. I wanted to avoid changes to the existing flow and guard all new changes behind a config and hence this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865475194


##########
samza-core/src/main/java/org/apache/samza/config/JobConfig.java:
##########
@@ -479,6 +484,10 @@ public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);

Review Comment:
   removed the config and using the existence of checkpoints with elastic names to trigger using all checkpoints for computing last proc offsets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864342228


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+

Review Comment:
   /** 
   POJO / Util class to store system, stream, partition, and partition-key-bucket information associated with a Task, that is encoded in the task's name.
   
   */
   
   */



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864343575


##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -461,12 +470,19 @@ class OffsetManager(
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-      Map(taskName -> Map())
+    if (!elasticityCheckpointsEnabled) {

Review Comment:
   Could just be elasticityEnabled ?
   
   Branching could occur only when it is enabled, otherwise default to default path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r858902889


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Given a set of checkpoints, find the min aka smallest offset for an ssp
+   * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
+   * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
+   * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
+   *
+   * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
+   * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
+   *
+   * With descendants, a little care is needed. there could be descendants with different elasticity factors.
+   * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
+   * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
+   * Across elasticity factors, largest works just like in ancestor
+   *
+   * Taking a concrete example
+   * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
+   *    Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *    Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *    Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *    From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *    we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *    Thus,
+   *       SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
+   *       SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *    If the checkpoint map has
+   *      Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
+   *      looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
+   *      to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
+   *      picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
+   *      hence we need to take min of offsets within an elasticity factor.
+   *
+   * Given checkpoints for all the tasks in the checkpoint stream,
+   * computing the last proc offset for an ssp checkpoint for a task,
+   * the following needs to be met.
+   *    1. Ancestors: we need to take largest offset among ancestors for an ssp
+   *    2. Descendants:
+   *         a. group descendants by their elasticityFactor.
+   *         b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
+   *         c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
+   *    3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
+   *
+   * @param taskName
+   * @param taskSSPSet
+   * @param checkpointMap
+   * @param systemAdmins
+   * @return
+   */
+  public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPSet,
+      Map<TaskName, Checkpoint> checkpointMap,
+      SystemAdmins systemAdmins) {
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
+        getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
+    Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
+
+    Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
+
+    taskSSPSet.forEach(ssp_withKeyBucket -> {
+      log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
+
+      SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
+          ssp_withKeyBucket.getPartition());
+
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+      String currentLastOffsetForSSP = null;
+
+      String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);

Review Comment:
   repeating what was explained in the previous comment again here - 
   Suppose we are computing starting offset for task "Partition 0_0_4" (elasticity 4). Then its ancestors are "Partition 0" and "Partition 0_0_2" and itself. Suppose the checkpoint topic has checkpoints for all of these ancestors. Then we take the max of these to find the starting offset. Max of these gives us the largest offset and that will correspond to the latest deploy just before this current one. 
   
   in essence, taking max is to find the last deploy before the current one.



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,

Review Comment:
   Lets say we are computing the starting offset for task "Partition 0". Previous deploy had elasticity factor = 2 and hence the tasks were "Partition 0_0_2" and Partition 0_1_2". Both these tasks are descendants of "Partition 0". In this case, yes we should take the min of the two offsets. and that is what "descendantLastOffsetForSSP" in "computeLastProcessedOffsetsFromCheckpointMap" method does. 
   
   this particular method "getMaxOffsetForSSPInCheckpointSet" is to get the maximum offset for an ssp within a set of checkpoints. It will be used when computing offset from ancestors. Suppose we are computing starting offset for task "Partition 0_0_4" (elasticity 4). Then its ancestors are "Partition 0" and "Partition 0_0_2" and itself. Suppose the checkpoint topic has checkpoints for all of these ancestors. Then we take the max of these to find the starting offset. Max of these gives us the largest offset and that will correspond to the latest deploy just before this current one. 
   
   Hope this clarifies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864342294


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;

Review Comment:
   package org.apache.samza.elasticity.util;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865475247


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {

Review Comment:
   typically utils in samza are names like ZkUtils, BlobUtils, coordinationUtils and so on.
   so keeping this class as ElasticityUtils itself to be a lil clearer on what the class does.



##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {

Review Comment:
   actually, task name does not have key bucket or elasticity factor if elasticity is not enabled. 
   so dealing with that means the code will look similar to above. 
   
   retaining this method and removing the newly added config elasticity.checkpoints.enabled instead.
   
   added unit test for this method as it will now become part of the flow for all jobs irresp of whether they ever had elasticity enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864341952


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+public class ElasticTaskNameParts {

Review Comment:
   Can we just call it TaskNameComponents or TaskNameParts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] rmatharu commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r864343164


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {

Review Comment:
   can the caller instead use 
   getKeyBucket(TaskName) method instead 
   and act according to the value being -1 or 0,1,2,...etc ?
   Would that simplify the code and branching?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] lakshmi-manasa-g commented on a diff in pull request #1598: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on code in PR #1598:
URL: https://github.com/apache/samza/pull/1598#discussion_r865474621


##########
samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org