You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/19 07:17:58 UTC
[helix] branch ApplicationClusterManager updated: Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
new 7c312caf7 Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)
7c312caf7 is described below
commit 7c312caf7b265b5bb4df8a59533d60ec979f3269
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Sep 19 00:17:52 2023 -0700
Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoiningCluster for evacuation. (#2618)
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 17 ++++++
.../rebalancer/DelayedAutoRebalancer.java | 5 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 70 ++++++++++++++++++++--
.../rebalancer/TestInstanceOperation.java | 16 ++++-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 10 ++++
5 files changed, 108 insertions(+), 10 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 14863f57e..085a987b1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -738,4 +738,21 @@ public interface HelixAdmin {
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);
+
+ /**
+ * Return if instance operation 'Evacuate' is finished.
+ * @param clusterName
+ * @param instancesNames
+ * @return Return true if there is no current state nor pending message on the instance.
+ */
+ boolean isEvacuateFinished(String clusterName, String instancesNames);
+
+ /**
+ * Return if instance is ready for preparing joining cluster. The instance should have no current state,
+ * no pending message and tagged with operation that exclude the instance from Helix assignment.
+ * @param clusterName
+ * @param instancesNames
+ * @return true if the instance is ready for preparing joining cluster.
+ */
+ boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 56be04530..ff5824722 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -33,7 +33,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
@@ -56,7 +55,7 @@ import org.slf4j.LoggerFactory;
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
- private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");
+ public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");
@Override
public IdealState computeNewIdealState(String resourceName,
@@ -205,7 +204,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
Set<String> nodes) {
return nodes.stream()
.filter(
- instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
+ instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 15b38fcbc..44afee5e1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,6 +47,7 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
@@ -57,6 +58,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -407,10 +409,70 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void enableResource(final String clusterName, final String resourceName,
- final boolean enabled) {
- logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
- clusterName);
+ public boolean isEvacuateFinished(String clusterName, String instanceName) {
+ return !instanceHasCurrentSateOrMessage(clusterName, instanceName) && (getInstanceConfig(clusterName,
+ instanceName).getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name()));
+ }
+
+ @Override
+ public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
+ return !instanceHasCurrentSateOrMessage(clusterName, instanceName)
+ && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
+ getInstanceConfig(clusterName, instanceName).getInstanceOperation());
+ }
+
+ /**
+ * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
+ * instance has no active session, or if instance is online but has no current state or pending message.
+ * @param clusterName
+ * @param instanceName
+ * @return
+ */
+ private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // check the instance is alive
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName,
+ clusterName);
+ return false;
+ }
+
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ // count number of sessions under CurrentState folder. If it is carrying over from prv session,
+ // then there are > 1 session ZNodes.
+ List<String> sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0);
+ if (sessions.size() > 1) {
+ logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName,
+ clusterName);
+ return true;
+ }
+
+ String sessionId = liveInstance.getEphemeralOwner();
+
+ String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
+ List<String> currentStates = baseAccessor.getChildNames(path, 0);
+ if (currentStates == null) {
+ logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ // see if instance has pending message.
+ List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
+ if (messages != null && !messages.isEmpty()) {
+ logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName);
+ return true;
+ }
+
+ return !currentStates.isEmpty();
+ }
+
+ @Override
+ public void enableResource(final String clusterName, final String resourceName, final boolean enabled) {
+ logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName);
String path = PropertyPathBuilder.idealState(clusterName, resourceName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 3f459318b..6c51d58bb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -11,17 +11,18 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
-import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -57,6 +58,8 @@ public class TestInstanceOperation extends ZkTestBase {
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;
+
+ private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;
@@ -91,6 +94,8 @@ public class TestInstanceOperation extends ZkTestBase {
createTestDBs(200);
setUpWagedBaseline();
+
+ _admin = new ZKHelixAdmin(_gZkClient);
}
@Test
@@ -106,7 +111,6 @@ public class TestInstanceOperation extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
- System.out.println("123");
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// New ev should contain all instances but the evacuated one
@@ -119,6 +123,9 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}
+
+ Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
+ Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}
@Test(dependsOnMethods = "testEvacuate")
@@ -215,6 +222,8 @@ public class TestInstanceOperation extends ZkTestBase {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
}
+ Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
+ Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
@@ -261,6 +270,7 @@ public class TestInstanceOperation extends ZkTestBase {
// message should be pending at the to evacuate participant
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
+ Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
// cancel evacuation
_gSetupTool.getClusterManagementTool()
@@ -323,7 +333,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}
-
+ Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}
@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 81993475b..512a7b4db 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -550,4 +550,14 @@ public class MockHelixAdmin implements HelixAdmin {
List<String> instancesNames) {
return null;
}
+
+ @Override
+ public boolean isEvacuateFinished(String clusterName, String instancesNames) {
+ return false;
+ }
+
+ @Override
+ public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
+ return false;
+ }
}