You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/04/03 00:17:13 UTC

[samza] branch master updated: SAMZA-2633: Rolling upgrades cause downtime to upgraded processors for the entire deployment window (#1484)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 86932b9  SAMZA-2633: Rolling upgrades cause downtime to upgraded processors for the entire deployment window (#1484)
86932b9 is described below

commit 86932b9c36a7e69cd0189e522835bb2af6bab634
Author: mynameborat <bh...@gmail.com>
AuthorDate: Fri Apr 2 17:17:06 2021 -0700

    SAMZA-2633: Rolling upgrades cause downtime to upgraded processors for the entire deployment window (#1484)
    
    Description:
    During rolling upgrades, the current debounce timer gets extended every time when there is a quorum change notification. As a result, processors that were upgraded earlier in the deployment window remain unavailable waiting for work assignment. In some scenarios, this cause processors to be unavailable for 20 minutes or so depending on the size of the quorum and the debounce time configuration. Refer to SAMZA-2633 for more information.
    
    Changes:
    Optimize the leader workflow to skip rebalance if there is no changes to work assignment
    Make processors start with most recent agreed job model on startup
    Leader persists the active job model version in ZK to enable change [2]
    Introduce config for applications to opt-in for the optimization
    
    Usage Instructions:
    Set job.coordinator.zk.enable-startup-with-active-job-model to true as part of the application configuration to enable processor use the recent active job model during startup and also enable leader to skip rebalances if the work assignment remains the same.
---
 .../versioned/jobs/samza-configurations.md         |   1 +
 .../java/org/apache/samza/config/ZkConfig.java     |   5 +
 .../org/apache/samza/job/model/JobModelUtil.java   |  17 ++--
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  93 ++++++++++++++++-
 .../java/org/apache/samza/zk/ZkKeyBuilder.java     |  12 ++-
 .../src/main/java/org/apache/samza/zk/ZkUtils.java |  36 +++++++
 .../apache/samza/job/model/TestJobModelUtil.java   |  27 +++--
 .../org/apache/samza/zk/TestZkJobCoordinator.java  | 111 ++++++++++++++++++++-
 8 files changed, 281 insertions(+), 21 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 96e994a..596dc52 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -373,6 +373,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.coordinator.zk.connection.timeout.ms|60000|Zookeeper connection timeout in milliseconds. Zk connection timeout controls how long client tries to connect to ZK server before giving up.|
 |job.coordinator.zk.consensus.timeout.ms|40000|Zookeeper-based coordination. How long each processor will wait for all the processors to report acceptance of the new job model before rolling back.|
 |job.debounce.time.ms|20000|Zookeeper-based coordination. How long the Leader processor will wait before recalculating the JobModel on change of registered processors.|
+|job.coordinator.zk.enable-startup-with-active-job-model|false|Enable stream processors to run with the active job model version on startup without waiting for leader to trigger rebalance. It is useful in scenarios where processors leave the quorum and comeback within debounce time and the work assignment for new quorum remains unchanged. If disabled, processors will wait for leader to generate and notify work assignment.
 
 ### <a name="metrics"></a>[6. Metrics](#metrics)
 |Name|Default|Description|
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index 21bfe76..a8788fe 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -20,6 +20,7 @@
 package org.apache.samza.config;
 
 public class ZkConfig extends MapConfig {
+  public static final String STARTUP_WITH_ACTIVE_JOB_MODEL = "job.coordinator.zk.enable-startup-with-active-job-model";
   // Connection string for ZK, format: :<hostname>:<port>,..."
   public static final String ZK_CONNECT = "job.coordinator.zk.connect";
   public static final String ZK_SESSION_TIMEOUT_MS = "job.coordinator.zk.session.timeout.ms";
@@ -34,6 +35,10 @@ public class ZkConfig extends MapConfig {
     super(config);
   }
 
+  public boolean getEnableStartupWithActiveJobModel() {
+    return getBoolean(STARTUP_WITH_ACTIVE_JOB_MODEL, false);
+  }
+
   public String getZkConnect() {
     if (!containsKey(ZK_CONNECT)) {
       throw new ConfigException("Missing " + ZK_CONNECT + " config!");
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index f1d6073..b529f7f 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.job.model;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -138,18 +138,17 @@ public class JobModelUtil {
       return false;
     }
 
-    return compareContainerModel(first.getContainers().get(processorId), second.getContainers().get(processorId));
+    return Objects.equals(first.getContainers().get(processorId), second.getContainers().get(processorId));
   }
 
   /**
-   * Helper method to compare the two input {@link ContainerModel}s.
-   * @param first first container model
-   * @param second second container model
-   * @return true - if two input {@link ContainerModel} are equal
+   * Compares the {@link ContainerModel}s across the two {@link JobModel}s.
+   * @param first first job model
+   * @param second second job model
+   * @return true - if {@link ContainerModel}s are the same across {@link JobModel}s
    *         false - otherwise
    */
-  @VisibleForTesting
-  static boolean compareContainerModel(ContainerModel first, ContainerModel second) {
+  public static boolean compareContainerModels(JobModel first, JobModel second) {
     if (first == second) {
       return true;
     }
@@ -158,7 +157,7 @@ public class JobModelUtil {
       return false;
     }
 
-    return first.equals(second);
+    return Objects.equals(first.getContainers(), second.getContainers());
   }
 
   private static String getJobModelKey(String version) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 102f357..bc24752 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -94,12 +94,14 @@ public class ZkJobCoordinator implements JobCoordinator {
    **/
   private static final String ON_ZK_CLEANUP = "OnCleanUp";
 
+  // Action name when the processor starts with last agreed job model upon start
+  static final String START_WORK_WITH_LAST_ACTIVE_JOB_MODEL = "StartWorkWithLastActiveJobModel";
+
   private final ZkUtils zkUtils;
   private final String processorId;
 
   private final Config config;
   private final ZkJobCoordinatorMetrics metrics;
-  private final ZkLeaderElector leaderElector;
   private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
   private final StreamMetadataCache streamMetadataCache;
   private final SystemAdmins systemAdmins;
@@ -117,6 +119,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   private boolean hasLoadedMetadataResources = false;
   private String cachedJobModelVersion = null;
   private ZkBarrierForVersionUpgrade barrier;
+  private ZkLeaderElector leaderElector;
 
   @VisibleForTesting
   ZkSessionMetrics zkSessionMetrics;
@@ -163,12 +166,22 @@ public class ZkJobCoordinator implements JobCoordinator {
   public void start() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
     zkUtils.validateZkVersion();
-    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
+    zkUtils.validatePaths(new String[]{
+        keyBuilder.getProcessorsPath(),
+        keyBuilder.getJobModelVersionPath(),
+        keyBuilder.getActiveJobModelVersionPath(),
+        keyBuilder.getJobModelPathPrefix(),
+        keyBuilder.getTaskLocalityPath()});
 
     this.jobModelMetadataStore.init();
     systemAdmins.start();
     leaderElector.tryBecomeLeader();
     zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
+
+    if (new ZkConfig(config).getEnableStartupWithActiveJobModel()) {
+      debounceTimer.scheduleAfterDebounceTime(START_WORK_WITH_LAST_ACTIVE_JOB_MODEL, 0,
+          this::startWorkWithLastActiveJobModel);
+    }
   }
 
   @Override
@@ -274,6 +287,21 @@ public class ZkJobCoordinator implements JobCoordinator {
     LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds);
     JobModel newJobModel = generateNewJobModel(processorNodes);
 
+    /*
+     * Leader skips the rebalance even if there are changes in the quorum as long as the work assignment remains the same
+     * across all the processors. The optimization is useful in the following scenarios
+     *   1. The processor in the quorum restarts within the debounce window. Originally, this would trigger rebalance
+     *      across the processors stopping and starting their work assignment which is detrimental to availability of
+     *      the system. e.g. common scenario during rolling upgrades
+     *   2. Processors in the quorum which don't have work assignment and their failures/restarts don't impact the
+     *      quorum.
+     */
+    if (new ZkConfig(config).getEnableStartupWithActiveJobModel() &&
+        JobModelUtil.compareContainerModels(newJobModel, activeJobModel)) {
+      LOG.info("Skipping rebalance since there are no changes in work assignment");
+      return;
+    }
+
     // Create checkpoint and changelog streams if they don't exist
     if (!hasLoadedMetadataResources) {
       loadMetadataResources(newJobModel);
@@ -412,6 +440,8 @@ public class ZkJobCoordinator implements JobCoordinator {
       return;
     }
 
+    LOG.info("Checking for work assignment changes for processor {} between active job model {} and new job model {}",
+        processorId, activeJobModel, newJobModel);
     if (JobModelUtil.compareContainerModelForProcessor(processorId, activeJobModel, newJobModel)) {
       LOG.info("Skipping job model expiration for processor {} due to no change in work assignment.", processorId);
     } else {
@@ -475,11 +505,60 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   @VisibleForTesting
+  void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
+    debounceTimer = scheduleAfterDebounceTime;
+  }
+
+  @VisibleForTesting
+  void setLeaderElector(ZkLeaderElector zkLeaderElector) {
+    leaderElector = zkLeaderElector;
+  }
+
+  @VisibleForTesting
   void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade barrierUpgradeForVersion) {
     barrier = barrierUpgradeForVersion;
   }
 
   /**
+   * Start the processor with the last known active job model. It is safe to start with last active job model
+   * version in all the scenarios unless in the event of concurrent rebalance. We define safe as a way to ensure that no
+   * two processors in the quorum have overlapping work assignments.
+   * In case of a concurrent rebalance there two scenarios
+   *   1. Job model version update happens before processor registration
+   *   2. Job model version update happens after processor registration
+   * ZK guarantees FIFO order for client operations, the processor is guaranteed to see all the state up until its
+   * own registration.
+   * For scenario 1, due to above guarantee, the processor will not start with old assignment due to mismatch in
+   * latest vs last active. (If there is no mismatch, the scenario reduces to one of the safe scenarios)
+   *
+   * For scenario 2, it is possible for the processor to not see the writes by the leader about job model version change
+   * but will eventually receive a notification on the job model version change and act on it (potentially stop
+   * the work assignment if its not part of the job model).
+   *
+   * In the scenario where the processor doesn't start with last active job model version, it will continue to follow
+   * the old protocol where leader should get notified about the processor registration and potentially trigger
+   * rebalance and notify about changes in work assignment after consensus.
+   * TODO: SAMZA-2635: Rebalances in standalone doesn't handle DAG changes for restarted processor
+   */
+  @VisibleForTesting
+  void startWorkWithLastActiveJobModel() {
+    LOG.info("Starting the processor with the recent active job model");
+    String lastActiveJobModelVersion = zkUtils.getLastActiveJobModelVersion();
+    String latestJobModelVersion = zkUtils.getJobModelVersion();
+
+    if (lastActiveJobModelVersion != null && lastActiveJobModelVersion.equals(latestJobModelVersion)) {
+      final JobModel lastActiveJobModel = readJobModelFromMetadataStore(lastActiveJobModelVersion);
+
+      /*
+       * TODO: SAMZA-2645: Allow onNewJobModel as a valid state transition. Due to this limitation, we are forced
+       *  to invoke onJobModelExpired even if there is nothing to expire.
+       */
+      checkAndExpireJobModel(lastActiveJobModel);
+      onNewJobModel(lastActiveJobModel);
+    }
+  }
+
+  /**
    * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
    * and {@param processorNodes}.
    * @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
@@ -554,6 +633,16 @@ public class ZkJobCoordinator implements JobCoordinator {
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
           LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
+          /*
+           * Publish the active job model version separately to denote that the job model version is agreed by
+           * the quorum. The active job model version is used by processors as an optimization during their startup
+           * so that processors can start with the work assignment that was agreed by the quorum and allows the
+           * leader to skip the rebalance if there is no change in the work assignment for the quorum across
+           * quorum changes (processors leaving or joining)
+           */
+          if (leaderElector.amILeader()) {
+            zkUtils.publishActiveJobModelVersion(version);
+          }
           onNewJobModel(getJobModel());
         });
       } else {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 60a1e63..fea54ce 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -27,6 +27,7 @@ import com.google.common.base.Strings;
  *   - /
  *      |- groupId/
  *          |- JobModelGeneration/
+ *              |- activeJobModelVersion (data contains the most recent active job model version)
  *              |- jobModelVersion (data contains the version)
  *              |- jobModelUpgradeBarrier/ (contains barrier related data)
  *              |- jobModels/
@@ -42,7 +43,6 @@ import com.google.common.base.Strings;
  * This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
  */
 public class ZkKeyBuilder {
-
   static final String PROCESSORS_PATH = "processors";
   static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration";
   static final String JOB_MODEL_UPGRADE_BARRIER_PATH = "jobModelUpgradeBarrier";
@@ -89,6 +89,16 @@ public class ZkKeyBuilder {
     return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
   }
 
+  /**
+   * Denotes the path where the most recent active job model version is stored. The version of the job model is the
+   * most recent agreed upon version by the quorum. It differs from the <i>jobModelVersion</i> path which may
+   * have a newer version of job model published by the leader in during rebalance before consensus is achieved.
+   * @return the path where most recent active job model is stored
+   */
+  String getActiveJobModelVersionPath() {
+    return String.format("%s/%s/activeJobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
+  }
+
   String getJobModelPathPrefix() {
     return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 72e8f8d..3084be8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -503,6 +503,16 @@ public class ZkUtils {
   }
 
   /**
+   * Read the version of the job model that is the most recent agreed version by the quorum.
+   * @return most recent active job model version
+   */
+  public String getLastActiveJobModelVersion() {
+    String lastActiveJobModelVersion = zkClient.readData(keyBuilder.getActiveJobModelVersionPath(), true);
+    metrics.reads.inc();
+    return lastActiveJobModelVersion;
+  }
+
+  /**
    * Generates the next JobModel version that should be used by a processor group in a rebalancing phase
    * for coordination.
    * @param currentJobModelVersion the current version of JobModel.
@@ -555,6 +565,32 @@ public class ZkUtils {
         "(actual data version after update = " + stat.getVersion() + ")");
   }
 
+  /**
+   * Publish to most recent job model version that is agreed by the quorum.
+   * @param version active job model version
+   */
+  public void publishActiveJobModelVersion(String version) {
+    try {
+      zkClient.writeData(keyBuilder.getActiveJobModelVersionPath(), version);
+      metrics.writes.inc();
+      LOG.info("Published the active job model version = {} to zookeeper successfully.", version);
+    } catch (Exception e) {
+      /*
+       * Failure to write the most recent job model version has the following implications
+       *  1. Processors no longer benefit from the optimization to start the container upon restarts based
+       *     on the most recent active job model. It is useful in scenarios where processors leave the quorum and
+       *     comeback within debounce time and the work assignment for new quorum remains unchanged.
+       *  2. During rolling upgrades, processors that are upgraded initially will wait for the min(deployment
+       *     window, T + debounce time) where T is the time at which the last change notification of the quorum was received
+       *     by the leader.
+       *
+       *  That said, failures don't impact correctness and is better to continue processing as opposed to bringing down
+       *  the processor as a fatal error.
+       */
+      LOG.warn("Failed to persist the active job model version = {} due to {}", version, e);
+    }
+  }
+
   // validate that Zk protocol currently used by the job is the same as in this participant
   public void validateZkVersion() {
 
diff --git a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 856b2f8..2e9b9e4 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -38,15 +38,28 @@ public class TestJobModelUtil {
   private static final String PROCESSOR_ID = "testProcessor";
 
   @Test
-  public void testCompareContainerModel() {
-    assertTrue("Expecting null container models to return true", JobModelUtil.compareContainerModel(null, null));
+  public void testCompareContainerModels() {
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobModel first = mock(JobModel.class);
+    final JobModel second = mock(JobModel.class);
+    final String testProcessor2 = "testProcessor2";
 
-    assertFalse("Expecting false for two different container model",
-        JobModelUtil.compareContainerModel(mock(ContainerModel.class), mock(ContainerModel.class)));
+    when(first.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel));
+    when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel));
+
+    assertTrue("Expecting null job models to return true", JobModelUtil.compareContainerModels(null, null));
+    assertTrue("Expecting true for job model with same container model",
+        JobModelUtil.compareContainerModels(first, second));
+
+    when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mock(ContainerModel.class)));
+    assertFalse("Expecting false for two different job model",
+        JobModelUtil.compareContainerModels(first, second));
+
+    when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel, testProcessor2,
+        mock(ContainerModel.class)));
+    assertFalse("Expecting false for two different job model",
+        JobModelUtil.compareContainerModels(first, second));
 
-    final ContainerModel mockContainerModel = mock(ContainerModel.class);
-    assertTrue("Expecting true for same container model",
-        JobModelUtil.compareContainerModel(mockContainerModel, mockContainerModel));
   }
 
   @Test(expected = IllegalArgumentException.class)
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index bf681cb..ce81070 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -32,6 +32,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
@@ -53,6 +54,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
@@ -68,11 +70,11 @@ import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class TestZkJobCoordinator {
+  private static final String LATEST_JOB_MODEL_VERSION = "2";
   private static final String PROCESSOR_ID = "testProcessor";
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
 
-
   private final Config config;
   private final JobModel jobModel;
   private final MetadataStore zkMetadataStore;
@@ -95,7 +97,8 @@ public class TestZkJobCoordinator {
     Map<String, String> configMap = ImmutableMap.of(
         "job.coordinator.system", "kafka",
         "job.name", "test-job",
-        "systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
+        "systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory",
+        ZkConfig.STARTUP_WITH_ACTIVE_JOB_MODEL, "true");
     config = new MapConfig(configMap);
 
     Set<SystemStreamPartition> ssps = ImmutableSet.of(
@@ -310,6 +313,91 @@ public class TestZkJobCoordinator {
   }
 
   @Test
+  public void testStartWithActiveJobModelDisabled() {
+    final ScheduleAfterDebounceTime mockDebounceTimer = mock(ScheduleAfterDebounceTime.class);
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setLeaderElector(mock(ZkLeaderElector.class));
+    zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+
+    zkJobCoordinator.start();
+
+    verifyZeroInteractions(mockDebounceTimer);
+  }
+
+  @Test
+  public void testStartWithActiveJobModelEnabled() {
+    final ScheduleAfterDebounceTime mockDebounceTimer = mock(ScheduleAfterDebounceTime.class);
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, config, new NoOpMetricsRegistry(), zkUtils,
+        zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setLeaderElector(mock(ZkLeaderElector.class));
+    zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+
+    zkJobCoordinator.start();
+
+    verify(mockDebounceTimer, times(1)).scheduleAfterDebounceTime(
+        eq(ZkJobCoordinator.START_WORK_WITH_LAST_ACTIVE_JOB_MODEL),
+        anyLong(),
+        any());
+  }
+
+  @Test
+  public void testStartWorkWithLastActiveJobModel() {
+    final TaskName taskName = new TaskName("task1");
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    final JobModel mockJobModel = mock(JobModel.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
+
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, mock(TaskModel.class)));
+    when(mockJobModel.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, mockContainerModel));
+    when(zkUtils.getLastActiveJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+    when(zkUtils.getJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+    doReturn(mockJobModel).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
+
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.startWorkWithLastActiveJobModel();
+    verify(mockListener, times(1)).onJobModelExpired();
+    verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());
+    verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, mockJobModel);
+    assertEquals("Active job model should be updated with the new job model", mockJobModel,
+        zkJobCoordinator.getActiveJobModel());
+  }
+
+  @Test
+  public void testStartWorkWithLastActiveJobModelShouldNotStartContainer() {
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+
+    zkJobCoordinator.setListener(mockListener);
+
+    when(zkUtils.getLastActiveJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+    when(zkUtils.getJobModelVersion()).thenReturn(LATEST_JOB_MODEL_VERSION);
+
+    zkJobCoordinator.startWorkWithLastActiveJobModel();
+
+    verifyZeroInteractions(mockListener);
+    assertNull("Expected active job model to be null", zkJobCoordinator.getActiveJobModel());
+  }
+
+  @Test
+  public void testStartWorkWithLastActiveJobModelWithNullActiveJobModelVersion() {
+    final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore);
+
+    zkJobCoordinator.setListener(mockListener);
+
+    zkJobCoordinator.startWorkWithLastActiveJobModel();
+
+    verifyZeroInteractions(mockListener);
+    assertNull("Expected active job model to be null", zkJobCoordinator.getActiveJobModel());
+  }
+
+  @Test
   public void testLoadMetadataResources() throws IOException {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
@@ -351,6 +439,25 @@ public class TestZkJobCoordinator {
     verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
   }
 
+  @Test
+  public void testDoOnProcessorChangeWithNoChangesToWorkAssignment() {
+    ZkBarrierForVersionUpgrade mockBarrier = mock(ZkBarrierForVersionUpgrade.class);
+    ScheduleAfterDebounceTime mockDebounceTimer = mock(ScheduleAfterDebounceTime.class);
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(PROCESSOR_ID, config,
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
+    zkJobCoordinator.setActiveJobModel(jobModel);
+    zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+    zkJobCoordinator.setZkBarrierUpgradeForVersion(mockBarrier);
+
+    doReturn(jobModel).when(zkJobCoordinator).generateNewJobModel(any());
+    zkJobCoordinator.doOnProcessorChange();
+
+    verify(zkUtils, times(0)).publishJobModelVersion(anyString(), anyString());
+    verifyZeroInteractions(mockBarrier);
+    verifyZeroInteractions(mockDebounceTimer);
+    verify(zkJobCoordinator, times(0)).loadMetadataResources(any());
+  }
+
   private void testNoChangesInWorkAssignmentHelper(BiConsumer<ZkJobCoordinator, JobModel> testMethod,
       BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod) {
     final JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);