You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/04/16 00:40:03 UTC

[samza] branch elasticity-checkpoint-lastProcessedOffsets updated (e37fe259c -> 2cda17424)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a change to branch elasticity-checkpoint-lastProcessedOffsets
in repository https://gitbox.apache.org/repos/asf/samza.git


    from e37fe259c SAMZA-2730: Add process CPU usage metric using units of processor count instead of percentage (#1593)
     add 400d1972c SAMZA-2731: Add readAllCheckpoints to CheckpointManager and implement for KafkaCheckpointManager
     new 2cda17424 SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/samza/checkpoint/CheckpointManager.java |  13 +
 .../java/org/apache/samza/config/JobConfig.java    |   9 +
 .../samza/elasticity/ElasticTaskNameParts.java     |  80 ++++
 .../apache/samza/elasticity/ElasticityUtils.java   | 486 +++++++++++++++++++++
 .../apache/samza/checkpoint/OffsetManager.scala    |  34 +-
 .../org/apache/samza/config/TestJobConfig.java     |   9 +
 .../samza/elasticity/TestElasticityUtils.java      | 363 +++++++++++++++
 .../checkpoint/kafka/KafkaCheckpointManager.scala  |  32 +-
 .../kafka/TestKafkaCheckpointManager.java          |  19 +
 9 files changed, 1026 insertions(+), 19 deletions(-)
 create mode 100644 samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
 create mode 100644 samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java
 create mode 100644 samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java


[samza] 01/01: SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled

Posted by dc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch elasticity-checkpoint-lastProcessedOffsets
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 2cda1742469b5dcb0a3a623544875d59cf6661e1
Author: Manasa <mg...@linkedin.com>
AuthorDate: Fri Apr 15 16:52:45 2022 -0700

    SAMZA-2733: [Elasticity] Compute last processed offsets when container starts up using checkpoints from previous deploys when elasticity was enabled
---
 .../java/org/apache/samza/config/JobConfig.java    |   9 +
 .../samza/elasticity/ElasticTaskNameParts.java     |  80 ++++
 .../apache/samza/elasticity/ElasticityUtils.java   | 486 +++++++++++++++++++++
 .../apache/samza/checkpoint/OffsetManager.scala    |  34 +-
 .../org/apache/samza/config/TestJobConfig.java     |   9 +
 .../samza/elasticity/TestElasticityUtils.java      | 363 +++++++++++++++
 6 files changed, 972 insertions(+), 9 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 638474715..b637325a6 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -177,6 +177,11 @@ public class JobConfig extends MapConfig {
   public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled";
   private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true;
 
+  // if true, use checkpoints from previous deploys where elasticity was enabled
+  // set this to true if rolling back from elasticity to before elasticity.
+  public static final String JOB_ELASTICITY_CHECKPOINTS_ENABLED = "job.elasticity.checkpoints.enabled";
+  public static final boolean DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED = false;
+
 
   // Enabled elasticity for the job
   // number of (elastic) tasks in the job will be old task count X elasticity factor
@@ -479,6 +484,10 @@ public class JobConfig extends MapConfig {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
 
+  public boolean getElasticityCheckpointEnabled() {
+    return getBoolean(JOB_ELASTICITY_CHECKPOINTS_ENABLED, DEFAULT_JOB_ELASTICITY_CHECKPOINTS_ENABLED);
+  }
+
   public boolean getElasticityEnabled() {
     return getElasticityFactor() > 1;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
new file mode 100644
index 000000000..f9268d9be
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticTaskNameParts.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+public class ElasticTaskNameParts {
+
+  public static final int DEFAULT_KEY_BUCKET = 0;
+  public static final int DEFAULT_ELASTICITY_FACTOR = 1;
+  public static final int INVALID_PARTITION = -1;
+
+  public final String system;
+  public final String stream;
+  public final int partition;
+  public final int keyBucket;
+  public final int elasticityFactor;
+
+  public ElasticTaskNameParts(int partition) {
+    this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
+  }
+
+  public ElasticTaskNameParts(int partition, int keyBucket, int elasticityFactor) {
+    this("", "", partition, keyBucket, elasticityFactor);
+  }
+
+  public ElasticTaskNameParts(String system, String stream, int partition) {
+    this(system, stream, partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
+  }
+
+  public ElasticTaskNameParts(String system, String stream, int partition, int keyBucket, int elasticityFactor) {
+    this.system = system;
+    this.stream = stream;
+    this.partition = partition;
+    this.keyBucket = keyBucket;
+    this.elasticityFactor = elasticityFactor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof ElasticTaskNameParts)) return false;
+
+    ElasticTaskNameParts that = (ElasticTaskNameParts) o;
+
+    if (!(this.system.equals(that.system))
+        || !(this.stream.equals(that.stream))
+        || (this.partition != that.partition)
+        || (this.keyBucket != that.keyBucket)
+        || (this.elasticityFactor != that.elasticityFactor)) {
+      return false;
+    }
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + system.hashCode();
+    result = prime * result + stream.hashCode();
+    result = prime * result + partition;
+    result = prime * result + keyBucket;
+    result = prime * result + elasticityFactor;
+    return result;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java
new file mode 100644
index 000000000..990702cd9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/elasticity/ElasticityUtils.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class with util methods to be used for checkpoint computation when elasticity is enabled
+ * Elasticity is supported only  for tasks created by either
+ * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
+ * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
+ */
+public class ElasticityUtils {
+  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
+
+  // GroupByPartition tasks have names like Partition 0_1_2
+  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
+  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
+  static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
+
+  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
+  // where 2 is the elasticity factor
+  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
+  static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
+  static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
+  static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
+
+  /**
+   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
+   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
+   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
+   * Both tasks have names ending with _%d where %d is the elasticity factor
+   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
+   * @return
+   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
+   *   for other tasks returns 1 which is the default elasticity factor
+   */
+  static int getElasticityFactorFromTaskName(TaskName taskName) {
+    return getTaskNameParts(taskName).elasticityFactor;
+  }
+
+  /**
+   * checks if the given taskname is of a GroupByPartition task
+   * @param taskName of any task
+   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
+   */
+  static boolean isGroupByPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
+  }
+
+  /**
+   * checks if the given taskname is of a GroupBySystemStreamPartition task
+   * @param taskName of any task
+   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
+   */
+  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
+    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
+  }
+
+  /**
+   * checks if given taskName is elastic aka created with an elasticity factor > 1
+   * @param taskName of any task
+   * @return true for following, false otherwise
+   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
+   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
+   */
+  static boolean isTaskNameElastic(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      Pattern p = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+      Matcher m = p.matcher(taskName.getTaskName());
+      return m.find();
+    }
+    return false;
+  }
+
+  /**
+   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
+   * @param taskName any taskName
+   * @return ElasticTaskNameParts object containing system, stream, partition, keyBucket and elasticityFactor
+   *    for GroupByPartition task:
+   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
+   *         system and stream are empty "" strings and partition is the input partition,
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *    for GroupBySystemStreamPartition task:
+   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
+   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
+   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
+   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
+   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
+   *   for tasks created with other SSP groupers:
+   *        default ElasticTaskNameParts is returned which has empty system, stream,
+   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
+   */
+  static ElasticTaskNameParts getTaskNameParts(TaskName taskName) {
+    if (isGroupByPartitionTask(taskName)) {
+      return getTaskNameParts_GroupByPartition(taskName);
+    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
+      return getTaskNameParts_GroupBySSP(taskName);
+    }
+    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+        + "Elasticity is not supported for this taskName. "
+        + "Returning default ElasticTaskNameParts which has default keyBucket 0,"
+        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
+    return new ElasticTaskNameParts(ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupByPartition(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)),
+          Integer.valueOf(matcher.group(2)),
+          Integer.valueOf(matcher.group(3)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(Integer.valueOf(matcher.group(1)));
+    }
+    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * see doc for getTaskNameParts above
+   */
+  static ElasticTaskNameParts getTaskNameParts_GroupBySSP(TaskName taskName) {
+    String taskNameStr = taskName.getTaskName();
+    log.info("GetTaskNameParts for taskName {}", taskNameStr);
+    Pattern elasticTaskPattern = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
+    Pattern nonElasticTaskPattern = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
+
+    Matcher matcher = elasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)),
+          Integer.valueOf(matcher.group(4)),
+          Integer.valueOf(matcher.group(5)));
+    }
+    matcher = nonElasticTaskPattern.matcher(taskNameStr);
+    if (matcher.find()) {
+      return new ElasticTaskNameParts(matcher.group(1),
+          matcher.group(2),
+          Integer.valueOf(matcher.group(3)));
+    }
+    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
+    throw new IllegalArgumentException("TaskName format incompatible");
+  }
+
+  /**
+   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
+   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
+   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
+   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
+   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
+   *    For example:
+   *      case 1: elasticityFactor 2 to 1
+   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
+   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
+   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
+   *      case 2: elasticityFactor 2 to 2 - no change
+   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
+   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
+   *      case 3: elasticityFactor 4 to 2
+   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
+   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
+   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
+   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   *
+   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
+   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
+   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
+   * @param currentTask
+   * @param otherTask
+   * @return true if otherTask is ancestor of currentTask, false otherwise
+   */
+  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor > currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (currentTaskNameParts.keyBucket % otherTaskNameParts.elasticityFactor) == otherTaskNameParts.keyBucket;
+  }
+
+  /**
+   * See javadoc for isOtherTaskAncestorOfCurrentTask above
+   * Given currentTask and otherTask,
+   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
+   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
+   * @param currentTask
+   * @param otherTask
+   * @return
+   */
+  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
+    log.info("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
+    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
+        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
+      return false;
+    }
+
+    ElasticTaskNameParts currentTaskNameParts = getTaskNameParts(currentTask);
+    ElasticTaskNameParts otherTaskNameParts = getTaskNameParts(otherTask);
+
+    if (!otherTaskNameParts.system.equals(currentTaskNameParts.system)
+        || !otherTaskNameParts.stream.equals(currentTaskNameParts.stream)
+        || otherTaskNameParts.partition != currentTaskNameParts.partition
+        || otherTaskNameParts.elasticityFactor <= currentTaskNameParts.elasticityFactor) {
+      return false;
+    }
+
+    return (otherTaskNameParts.keyBucket % currentTaskNameParts.elasticityFactor) == currentTaskNameParts.keyBucket;
+  }
+
+  /**
+   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
+   * All ancestor checkpoints are put into a set
+   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
+   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
+   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
+   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
+   * @param taskName name of the task
+   * @param checkpointMap map from taskName to checkpoint
+   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
+   */
+  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
+      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
+    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
+    log.info("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
+    checkpointMap.keySet().forEach(otherTaskName -> {
+      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
+      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
+        ancestorCheckpoints.add(otherTaskCheckpoint);
+      }
+      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
+        log.info("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
+        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
+        if (!descendantCheckpoints.containsKey(otherEF)) {
+          descendantCheckpoints.put(otherEF, new HashSet<>());
+        }
+        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
+      }
+    });
+    log.info("done computing all ancestors and descendants of {}", taskName);
+    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
+  }
+
+  /**
+   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
+   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
+   * @param checkpoint Checkpoint containing SSP -> offset
+   * @param ssp SystemStreamPartition for which an offset needs to be fetched
+   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
+   */
+  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
+    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for ssp {}, in checkpoint {}", ssp, checkpointStr);
+
+    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
+        .stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
+            .getPartition()
+            .equals(ssp.getPartition()))
+        .map(Map.Entry::getValue)
+        .findFirst();
+    if (offsetFound.isPresent()) {
+      return offsetFound.get();
+    }
+    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
+    return null;
+  }
+
+  /**
+   * Given a set of checkpoints, find the max aka largest offset for an ssp
+   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Given a set of checkpoints, find the min aka smallest offset for an ssp
+   * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
+   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
+   * @param checkpointSet set of checkpoints
+   * @param ssp for which largest offset is needed
+   * @param systemAdmin of the ssp.getSystem()
+   * @return offset - string if one exists else null
+   */
+  static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
+      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
+    return checkpointSet.stream()
+        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
+        .filter(Objects::nonNull)
+        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
+   * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
+   * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
+   * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
+   *
+   * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
+   * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
+   *
+   * With descendants, a little care is needed. there could be descendants with different elasticity factors.
+   * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
+   * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
+   * Across elasticity factors, largest works just like in ancestor
+   *
+   * Taking a concrete example
+   * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
+   *    Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
+   *    Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
+   *    Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
+   *    Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
+   *    From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
+   *    we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+   *    Thus,
+   *       SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
+   *       SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
+   *    If the checkpoint map has
+   *      Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
+   *      looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
+   *      to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
+   *      picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
+   *      hence we need to take min of offsets within an elasticity factor.
+   *
+   * Given checkpoints for all the tasks in the checkpoint stream,
+   * computing the last proc offset for an ssp checkpoint for a task,
+   * the following needs to be met.
+   *    1. Ancestors: we need to take largest offset among ancestors for an ssp
+   *    2. Descendants:
+   *         a. group descendants by their elasticityFactor.
+   *         b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
+   *         c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
+   *    3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
+   *
+   * @param taskName
+   * @param taskSSPSet
+   * @param checkpointMap
+   * @param systemAdmins
+   * @return
+   */
+  public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPSet,
+      Map<TaskName, Checkpoint> checkpointMap,
+      SystemAdmins systemAdmins) {
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
+        getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
+    Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
+    Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
+
+    Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
+
+    taskSSPSet.forEach(ssp_withKeyBucket -> {
+      log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
+
+      SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
+          ssp_withKeyBucket.getPartition());
+
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+      String currentLastOffsetForSSP = null;
+
+      String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);
+
+      log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}",
+          taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP);
+
+      String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream()
+          .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset
+          .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
+          .findFirst().orElse(null);
+
+      log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}",
+          taskName, ssp_withKeyBucket, descendantLastOffsetForSSP);
+
+      Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP);
+      if (offsetComparison != null && offsetComparison > 0) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP
+        currentLastOffsetForSSP = ancestorLastOffsetForSSP;
+      } else {
+        currentLastOffsetForSSP = descendantLastOffsetForSSP;
+      }
+      log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP);
+      taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP);
+    });
+
+    String checkpointStr = taskSSPOffsets.entrySet().stream()
+        .map(k -> k.getKey() + " : " + k.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+    log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr);
+    return taskSSPOffsets;
+  }
+
+  public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) {
+    return checkpointMap.keySet().stream()
+        .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it
+        .findFirst().isPresent();
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 7491eaaaf..7a12625f0 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,12 +22,13 @@ package org.apache.samza.checkpoint
 import java.util
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.annotation.InterfaceStability
-import org.apache.samza.config.{Config, StreamConfig, SystemConfig}
+import org.apache.samza.checkpoint.OffsetManager.info
+import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.elasticity.ElasticityUtils
 import org.apache.samza.startpoint.{Startpoint, StartpointManager}
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
 import org.apache.samza.system._
@@ -105,7 +106,10 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
+    val elasticityCheckpointsEnabled = new JobConfig(config).getElasticityCheckpointEnabled
+
+    new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners,
+      offsetManagerMetrics, elasticityCheckpointsEnabled)
   }
 }
 
@@ -160,7 +164,12 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
    */
-  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) extends Logging {
+  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
+
+  /**
+   * if true, checkpoints generated during elasticity deploys will be used for last processed offsets computation at container start
+   */
+  val elasticityCheckpointsEnabled: Boolean = false) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
@@ -461,12 +470,19 @@ class OffsetManager(
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-      Map(taskName -> Map())
+    val checkpointMap = checkpointManager.readAllCheckpoints()
+    if (!elasticityCheckpointsEnabled || !ElasticityUtils.wasElasticityEnabled(checkpointMap)) {
+      if (checkpoint != null) {
+        return Map(taskName -> checkpoint.getOffsets.asScala.toMap)
+      } else {
+        info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
+        return Map(taskName -> Map())
+      }
     }
+    info("Elasticity checkpoints is enabled and there was elasticity enabled in one of the previous deploys." +
+      "Last processed offsets computation at container start will use elasticity checkpoints if available.")
+    Map(taskName -> ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName,
+      systemStreamPartitions.get(taskName).get.asJava, checkpointMap, systemAdmins).asScala)
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 4d171662c..9cf70ff07 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -600,6 +600,15 @@ public class TestJobConfig {
         "false"))).getContainerHeartbeatMonitorEnabled());
   }
 
+  @Test
+  public void testGetElasticityCheckpointEnabled() {
+    assertFalse(new JobConfig(new MapConfig()).getElasticityCheckpointEnabled());
+    assertTrue(new JobConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED, "true"))).getElasticityCheckpointEnabled());
+    assertFalse(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_CHECKPOINTS_ENABLED,
+        "false"))).getElasticityCheckpointEnabled());
+  }
+
   @Test
   public void testGetElastictyEnabled() {
     // greater than 1 means enabled
diff --git a/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java
new file mode 100644
index 000000000..7590c9a3a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/elasticity/TestElasticityUtils.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.elasticity;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+// #TODO: going to make this entire class parametrized.
+public class TestElasticityUtils {
+  private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0");
+  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2");
+  private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
+  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
+
+  @Test
+  public void testComputeLastProcessedOffsetsFromCheckpointMap() {
+    // Setup :
+    // there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job
+    // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2.
+    // Before elasticity, job has one task with name "Partition 0"
+    // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2"
+    //         Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
+    //         Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
+    // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4"
+    //         Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
+    //         Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
+    //         Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)]
+    //         Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)]
+
+    //
+    // From the definition of keyBucket computation using elasticity factor in
+    // {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as
+    // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor
+    // messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2
+    // messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2
+    // messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself
+
+    TaskName taskName = new TaskName("Partition 0_0_2");
+    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
+    SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
+    SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2);
+
+
+    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
+    // offsets ordering 1 < 2 < 3 < 4
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1);
+    Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1);
+
+    SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
+    Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
+
+    // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself.
+    // hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3"));
+    Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
+    Assert.assertEquals("4", result.get(ssp0));
+
+    // case 2: for task Partition 0_0_2: last deploy was with ef =1
+    // hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset
+    checkpointMap = new HashMap<>();
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2"));
+
+
+    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
+    Assert.assertEquals("4", result.get(ssp0));
+
+
+    // case 3: for task partition 0_0_2: last deploy was with ef = 4
+    // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant.
+    // since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets
+
+    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
+    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2"));
+    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
+    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4"));
+    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
+        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
+    Assert.assertEquals("3", result.get(ssp0));
+  }
+
+  @Test
+  public void testTaskIsGroupByPartitionOrGroupBySSP() {
+    String msgPartition = "GroupByPartition task should start with Partition";
+    String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition";
+
+    Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION));
+    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION));
+
+    Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(
+        ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+
+    Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP));
+    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP));
+
+    Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
+    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
+
+    TaskName taskName = new TaskName("FooBar");
+    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName));
+    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName));
+  }
+
+  @Test
+  public void testIsTaskNameElastic() {
+    Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP));
+    Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP));
+    Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION));
+    Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
+  }
+
+  @Test
+  public void testGetElasticTaskNameParts() {
+    ElasticTaskNameParts taskNameParts = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION);
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, ElasticTaskNameParts.DEFAULT_KEY_BUCKET);
+    Assert.assertEquals(taskNameParts.elasticityFactor, ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION);
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 1);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 2);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP);
+    Assert.assertEquals(taskNameParts.system, "systemA");
+    Assert.assertEquals(taskNameParts.stream, "streamB");
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, ElasticTaskNameParts.DEFAULT_KEY_BUCKET);
+    Assert.assertEquals(taskNameParts.elasticityFactor, ElasticTaskNameParts.DEFAULT_ELASTICITY_FACTOR);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP);
+    Assert.assertEquals(taskNameParts.system, "systemA");
+    Assert.assertEquals(taskNameParts.stream, "streamB");
+    Assert.assertEquals(taskNameParts.partition, 0);
+    Assert.assertEquals(taskNameParts.keyBucket, 1);
+    Assert.assertEquals(taskNameParts.elasticityFactor, 2);
+
+    taskNameParts = ElasticityUtils.getTaskNameParts(new TaskName("FooBar"));
+    Assert.assertEquals(taskNameParts.partition, ElasticTaskNameParts.INVALID_PARTITION);
+  }
+
+  @Test
+  public void testIsOtherTaskAncestorDescendantOfCurrentTask() {
+    TaskName task0 = new TaskName("Partition 0");
+    TaskName task1 = new TaskName("Partition 1");
+    TaskName task002 = new TaskName("Partition 0_0_2");
+    TaskName task012 = new TaskName("Partition 0_1_2");
+    TaskName task004 = new TaskName("Partition 0_0_4");
+    TaskName task014 = new TaskName("Partition 0_1_4");
+    TaskName task024 = new TaskName("Partition 0_2_4");
+    TaskName task034 = new TaskName("Partition 0_3_4");
+
+    TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
+    TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2");
+    TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
+    TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4");
+    TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4");
+    TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4");
+    TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4");
+
+    // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself
+    // and all these tasks are descendants of Partition 0 (except itself)
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0));
+    Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0));
+
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034));
+
+    // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself
+    // these tasks are descendants of 0_0_2
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002));
+
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024));
+
+    // "SystemStreamPartition [systemA, streamB, 0]
+    // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself
+    // and all these tasks are descendants of Partition 0 (except itself)
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0));
+
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034));
+
+    // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of
+    // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself
+    // similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002));
+
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004));
+    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024));
+  }
+
+  @Test
+  public void testGetAncestorAndDescendantCheckpoints() {
+    TaskName taskName = new TaskName("Partition 0_0_2");
+    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
+    Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1");
+    Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2");
+    Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3");
+    Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4");
+    Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5");
+    Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2));
+    Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2));
+
+    checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1);
+    checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2);
+    checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1);
+    checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2);
+    checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint);
+
+    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result =
+        ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
+    Set<Checkpoint> anscestorCheckpointSet = result.getLeft();
+    Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4);
+
+    Assert.assertTrue("should contain all ancestors' checkpoints",
+        anscestorCheckpointSet.containsAll(ansCheckpointSet));
+    Assert.assertFalse("should not contain a descendant checkpoint in anscetor list",
+        anscestorCheckpointSet.contains(desCheckpoint1));
+    Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list",
+        anscestorCheckpointSet.contains(unrelCheckpoint));
+
+    Assert.assertTrue("should contain all descendants' checkpoints",
+        descendantCheckpointSetForEf4.containsAll(desCheckpointSet));
+    Assert.assertFalse("should not contain a anscetor checkpoint in descendant list",
+        descendantCheckpointSetForEf4.contains(ansCheckpoint1));
+    Assert.assertFalse("should not contain an unrelated checkpoint in descendant list",
+        descendantCheckpointSetForEf4.contains(unrelCheckpoint));
+  }
+
+  @Test
+  public void testGetOffsetForSSPInCheckpoint() {
+    String offset1 = "1111";
+    String offset2 = "2222";
+    // case 1: when looking for exact ssp
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
+    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
+    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1);
+
+    // case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket)
+    SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1);
+    checkpoint1 = buildCheckpointV2(sspWithKB, offset2);
+    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2);
+
+    // case 3: try getting offset for an ssp not present in the checkpoint -> should return null
+    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1));
+    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null);
+  }
+
+  @Test
+  public void testGetMaxMinOffsetForSSPInCheckpointSet() {
+    String offset1 = "1111";
+    String offset2 = "2222";
+
+    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
+    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
+    Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2);
+    Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2));
+
+    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
+    // offset 1 < offset2
+    Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1);
+    Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1);
+
+    // case 1: when exact ssp is in checkpoint set
+    Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
+    Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
+
+    // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket)
+    SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1);
+    Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
+    Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
+
+
+    // case 3: when ssp not in checkpoint set -> should receive null for min and max offset
+    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0));
+    Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
+    Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
+  }
+
+  private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) {
+    return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset),
+        ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
+  }
+}