You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/02 21:06:16 UTC

[GitHub] [samza] shanthoosh commented on a change in pull request #1484: SAMZA-2633: Rolling upgrades cause downtime to upgraded processors for the entire deployment window

shanthoosh commented on a change in pull request #1484:
URL: https://github.com/apache/samza/pull/1484#discussion_r606419141



##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -274,6 +283,20 @@ void doOnProcessorChange() {
     LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds);
     JobModel newJobModel = generateNewJobModel(processorNodes);
 
+    /*
+     * Leader skips the rebalance even if there are changes in the quorum as long as the work assignment remains the same
+     * across all the processors. The optimization is useful in the following scenarios
+     *   1. The processor in the quorum restarts within the debounce window. Originally, this would trigger rebalance
+     *      across the processors stopping and starting their work assignment which is detrimental to availability of
+     *      the system. e.g. common scenario during rolling upgrades
+     *   2. Processors in the quorum which don't have work assignment and their failures/restarts don't impact the
+     *      quorum.
+     */
+    if (newJobModel.equals(activeJobModel)) {

Review comment:
       This check would also include both the config and work-assignments for verifying equality between two different JobModels. 
   
   Standalone allows users to launch two processors with different configs in a quorum. Would be better to narrow down this check to work-assignment alone to achieve the desired effect.

##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
##########
@@ -27,6 +27,7 @@
  *   - /
  *      |- groupId/
  *          |- JobModelGeneration/
+ *              |- activeJobModelVersion (data contains the most recent active job model version)

Review comment:
       Semantically what is the difference between activeJobModelVersion & JobModelVersion.? On what scenarios would both of them would be different?

##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -474,11 +497,55 @@ void setActiveJobModel(JobModel jobModel) {
     activeJobModel = jobModel;
   }
 
+  @VisibleForTesting
+  void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
+    debounceTimer = scheduleAfterDebounceTime;
+  }
+
   @VisibleForTesting
   void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade barrierUpgradeForVersion) {
     barrier = barrierUpgradeForVersion;
   }
 
+  /**
+   * Start the processor with the last known active job model. It is safe to start with last active job model
+   * version in all the scenarios unless the event of concurrent rebalance. We define safe as a way to ensure that no
+   * two processors in the quorum have overlapping work assignments.
+   * In case of a concurrent rebalance there two scenarios
+   *   1. Job model version update happens before processor registration
+   *   2. Job model version update happens after processor registration
+   * ZK guarantees FIFO order for client operations, the processor is guaranteed to see all the state up until its
+   * own registration.
+   * For scenario 1, due to above guarantee, the processor will not start with old assignment due to mismatch in
+   * latest vs last active. (If there is no mismatch, the scenario reduces to one of the safe scenarios)
+   *
+   * For scenario 2, it is possible for the processor to not see the writes by the leader about job model version change
+   * but will eventually receive a notification on the job model version change and act on it (potentially stop
+   * the work assignment if its not part of the job model).
+   *
+   * In the scenario where the processor doesn't start with last active job model version, it will continue to follow
+   * the old protocol where leader should get notified about the processor registration and potentially trigger
+   * rebalance and notify about changes in work assignment after consensus.
+   * TODO: SAMZA-2635: Rebalances in standalone doesn't handle DAG changes for restarted processor
+   */
+  @VisibleForTesting
+  void startWorkWithLastActiveJobModel() {
+    LOG.info("Starting the processor with the recent active job model");
+    String lastActiveJobModelVersion = zkUtils.getLastActiveJobModelVersion();
+    String latestJobModelVersion = zkUtils.getJobModelVersion();
+
+    if (lastActiveJobModelVersion != null && lastActiveJobModelVersion.equals(latestJobModelVersion)) {
+      final JobModel lastActiveJobModel = readJobModelFromMetadataStore(lastActiveJobModelVersion);
+
+      /*
+       * TODO: A temporary workaround since job model can be started only if the stream processor is in the rebalance

Review comment:
       Can you please create a follow-up ticket for this in samza and link it here. This would ensure that this context is not lost in the comments alone.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org