You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/19 07:17:58 UTC

[helix] branch ApplicationClusterManager updated: Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
     new 7c312caf7 Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)
7c312caf7 is described below

commit 7c312caf7b265b5bb4df8a59533d60ec979f3269
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Sep 19 00:17:52 2023 -0700

    Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)
---
 .../src/main/java/org/apache/helix/HelixAdmin.java | 17 ++++++
 .../rebalancer/DelayedAutoRebalancer.java          |  5 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 70 ++++++++++++++++++++--
 .../rebalancer/TestInstanceOperation.java          | 16 ++++-
 .../java/org/apache/helix/mock/MockHelixAdmin.java | 10 ++++
 5 files changed, 108 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 14863f57e..085a987b1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -738,4 +738,21 @@ public interface HelixAdmin {
    */
   Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
       List<String> instancesNames);
+
+  /**
+   * Return if instance operation 'Evacuate' is finished.
+   * @param clusterName
+   * @param instancesNames
+   * @return Return true if there is no current state nor pending message on the instance.
+   */
+  boolean isEvacuateFinished(String clusterName, String instancesNames);
+
+  /**
+   * Return if instance is ready for preparing joining cluster. The instance should have no current state,
+   * no pending message and tagged with operation that exclude the instance from Helix assignment.
+   * @param clusterName
+   * @param instancesNames
+   * @return true if the instance is ready for preparing joining cluster.
+   */
+  boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames);
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 56be04530..ff5824722 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -33,7 +33,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
@@ -56,7 +55,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
-  private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");
+   public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");
 
   @Override
   public IdealState computeNewIdealState(String resourceName,
@@ -205,7 +204,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       Set<String> nodes) {
     return nodes.stream()
         .filter(
-            instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
+            instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
         .collect(Collectors.toList());
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 15b38fcbc..44afee5e1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,6 +47,7 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
@@ -57,6 +58,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -407,10 +409,70 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void enableResource(final String clusterName, final String resourceName,
-      final boolean enabled) {
-    logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
-        clusterName);
+  public boolean isEvacuateFinished(String clusterName, String instanceName) {
+    return !instanceHasCurrentSateOrMessage(clusterName, instanceName) && (getInstanceConfig(clusterName,
+        instanceName).getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name()));
+  }
+
+  @Override
+  public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
+    return !instanceHasCurrentSateOrMessage(clusterName, instanceName)
+        && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
+        getInstanceConfig(clusterName, instanceName).getInstanceOperation());
+  }
+
+  /**
+   * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
+   * instance has no active session, or if instance is online but has no current state or pending message.
+   * @param clusterName
+   * @param instanceName
+   * @return
+   */
+  private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // check the instance is alive
+    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName,
+          clusterName);
+      return false;
+    }
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    // count number of sessions under CurrentState folder. If it is carrying over from prv session,
+    // then there are > 1 session ZNodes.
+    List<String> sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0);
+    if (sessions.size() > 1) {
+      logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName,
+          clusterName);
+      return true;
+    }
+
+    String sessionId = liveInstance.getEphemeralOwner();
+
+    String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
+    List<String> currentStates = baseAccessor.getChildNames(path, 0);
+    if (currentStates == null) {
+      logger.warn("Instance {} in cluster {} does not have live session.  The instance can be removed anyway.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    // see if instance has pending message.
+    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
+    if (messages != null && !messages.isEmpty()) {
+      logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName);
+      return true;
+    }
+
+    return !currentStates.isEmpty();
+  }
+
+  @Override
+  public void enableResource(final String clusterName, final String resourceName, final boolean enabled) {
+    logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName);
     String path = PropertyPathBuilder.idealState(clusterName, resourceName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
     if (!baseAccessor.exists(path, 0)) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 3f459318b..6c51d58bb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -11,17 +11,18 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
-import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -57,6 +58,8 @@ public class TestInstanceOperation extends ZkTestBase {
   private ZkHelixClusterVerifier _clusterVerifier;
   private ConfigAccessor _configAccessor;
   private long _stateModelDelay = 3L;
+
+  private HelixAdmin _admin;
   protected AssignmentMetadataStore _assignmentMetadataStore;
   HelixDataAccessor _dataAccessor;
 
@@ -91,6 +94,8 @@ public class TestInstanceOperation extends ZkTestBase {
     createTestDBs(200);
 
     setUpWagedBaseline();
+
+    _admin = new ZKHelixAdmin(_gZkClient);
   }
 
   @Test
@@ -106,7 +111,6 @@ public class TestInstanceOperation extends ZkTestBase {
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
 
-    System.out.println("123");
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // New ev should contain all instances but the evacuated one
@@ -119,6 +123,9 @@ public class TestInstanceOperation extends ZkTestBase {
       Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
       Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
     }
+
+    Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
+    Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
   }
 
   @Test(dependsOnMethods = "testEvacuate")
@@ -215,6 +222,8 @@ public class TestInstanceOperation extends ZkTestBase {
       TestHelper.verify(
           () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
     }
+    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
+    Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
 
     // sleep a bit so ST messages can start executing
     Thread.sleep(Math.abs(_stateModelDelay / 100));
@@ -261,6 +270,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // message should be pending at the to evacuate participant
     TestHelper.verify(
         () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
+    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
 
     // cancel evacuation
     _gSetupTool.getClusterManagementTool()
@@ -323,7 +333,7 @@ public class TestInstanceOperation extends ZkTestBase {
       Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
       Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
     }
-
+    Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
   }
 
   @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 81993475b..512a7b4db 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -550,4 +550,14 @@ public class MockHelixAdmin implements HelixAdmin {
       List<String> instancesNames) {
     return null;
   }
+
+  @Override
+  public boolean isEvacuateFinished(String clusterName, String instancesNames) {
+    return false;
+  }
+
+  @Override
+  public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
+    return false;
+  }
 }