You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/20 17:59:40 UTC

[helix] branch master updated: Fix unexpected result when resuming a cluster from paused/maintenance mode. (#1698)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5414f86  Fix unexpected result when resuming a cluster from paused/maintenance mode. (#1698)
5414f86 is described below

commit 5414f86a44f2cdfc5cac2cbaa48f7ca2b5703ea8
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue Apr 20 10:59:30 2021 -0700

    Fix unexpected result when resuming a cluster from paused/maintenance mode. (#1698)
    
    This PR aims to fix a race condition that the resume event could be pushed to the event queue before the cluster status has been updated. In this case, when the event is processed, the controller might still think the cluster is in paused/maintenance mode. As a result, the resume event is discarded and the rebalance won't be done until the next event happens.
---
 .../helix/controller/GenericHelixController.java   | 64 +++++++++++-----------
 1 file changed, 32 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 951143d..4dad60f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -361,17 +361,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
     NotificationContext changeContext = new NotificationContext(manager);
     changeContext.setType(NotificationContext.Type.CALLBACK);
-    String uid = UUID.randomUUID().toString().substring(0, 8);
-    ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid);
-    event.addAttribute(AttributeName.EVENT_SESSION.name(),
-        changeContext.getManager().getSessionIdIfLead());
-    event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
-    event.addAttribute(AttributeName.changeContext.name(), changeContext);
-    event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
-    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
-
-    enqueueEvent(_taskEventQueue, event);
-    enqueueEvent(_eventQueue, event.clone(uid));
+    pushToEventQueues(eventType, changeContext, Collections.EMPTY_MAP);
     logger.info(String
         .format("Controller rebalance pipeline triggered with event type: %s for cluster %s",
             eventType, _clusterName));
@@ -1271,11 +1261,15 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
       HelixManager manager = changeContext.getManager();
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
+
       PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
       MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
-      _paused = updateControllerState(changeContext, pauseSignal, _paused);
-      _inMaintenanceMode =
-          updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode);
+      boolean prevPaused = _paused;
+      boolean prevInMaintenanceMode = _inMaintenanceMode;
+      _paused = updateControllerState(pauseSignal, _paused);
+      _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
+      triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode);
+
       enableClusterStatusMonitor(true);
       _clusterStatusMonitor.setEnabled(!_paused);
       _clusterStatusMonitor.setPaused(_paused);
@@ -1469,35 +1463,41 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
   }
 
-  private boolean updateControllerState(NotificationContext changeContext, PauseSignal signal,
-      boolean statusFlag) {
+  private boolean updateControllerState(PauseSignal signal, boolean statusFlag) {
     if (signal != null) {
-      // This logic is used for recording first time entering PAUSE/MAINTENCE mode
       if (!statusFlag) {
         statusFlag = true;
+        // This log is recorded for the first time entering PAUSE/MAINTENANCE mode
         logger.info(String.format("controller is now %s",
             (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
       }
     } else {
-      if (statusFlag) {
-        statusFlag = false;
-        logger.info("controller is now resumed from paused state");
-        String uid = UUID.randomUUID().toString().substring(0, 8);
-        ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume,
-            String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
-        event.addAttribute(AttributeName.EVENT_SESSION.name(),
-            changeContext.getManager().getSessionIdIfLead());
-        event.addAttribute(AttributeName.changeContext.name(), changeContext);
-        event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
-        event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
-        enqueueEvent(_eventQueue, event);
-        enqueueEvent(_taskEventQueue,
-            event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
-      }
+      statusFlag = false;
     }
     return statusFlag;
   }
 
+  /**
+   * Trigger a Resume Event if the cluster is back to activated.
+   * @param changeContext
+   * @param prevPaused the previous paused status.
+   * @param prevInMaintenanceMode the previous in maintenance mode status.
+   */
+  private void triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
+      boolean prevInMaintenanceMode) {
+    /**
+     * WARNING: the logic here is tricky.
+     * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still
+     * paused, the resume event should not be sent.
+     * 2. Only send resume event if the status is changed back to active. So we don't send multiple
+     * event unnecessarily.
+     */
+    if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
+      pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
+      logger.info("controller is now resumed from paused/maintenance state");
+    }
+  }
+
   // TODO: refactor this to use common/ClusterEventProcessor.
   @Deprecated
   private class ClusterEventProcessor extends Thread {