You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/06/06 19:04:48 UTC

[helix] branch master updated: Clean format for disabled instances in ClusterConfig (#2106)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new efd4ce0a1 Clean format for disabled instances in ClusterConfig (#2106)
efd4ce0a1 is described below

commit efd4ce0a14a60da1c2546cfafa32135c10349dbc
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Jun 6 12:04:43 2022 -0700

    Clean format for disabled instances in ClusterConfig (#2106)
    
    We reverted enabling batch mode for instance enable/disable. This change hooks readData stage when controller starts in the first place and will clean up batch enabled field in cluster config.
    This change should be reverted after next next version. (1.0.5.0 +)
---
 .../dataproviders/BaseControllerDataProvider.java  | 68 +++++++++++++++++-----
 .../integration/TestBatchEnableInstances.java      |  1 -
 .../controller/TestControllerLeadershipChange.java | 54 +++++++++++++++++
 .../helix/rest/server/TestPerInstanceAccessor.java | 10 ++++
 4 files changed, 119 insertions(+), 14 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index f81f4b424..8e8f9fa9b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -63,6 +63,8 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,8 +76,7 @@ import org.slf4j.LoggerFactory;
  * This class will be moved to helix-common module in the future
  */
 public class BaseControllerDataProvider implements ControlContextProvider {
-  private static final Logger logger =
-      LoggerFactory.getLogger(BaseControllerDataProvider.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseControllerDataProvider.class);
 
   // We only refresh EV and TEV the very first time the cluster data cache is initialized
   private static final List<HelixConstants.ChangeType> _noFullRefreshProperty = Arrays
@@ -129,8 +130,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       // refresh every type when it is initialized
-      _propertyDataChangedMap
-          .put(type, new AtomicBoolean(true));
+      _propertyDataChangedMap.put(type, new AtomicBoolean(true));
     }
 
     // initialize caches
@@ -241,21 +241,64 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
       _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
       refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
+      // TODO: This is a temp function to clean up incompatible batched disabled instances format.
+      // Remove in later version.
+      if (_clusterConfig!=null && needCleanUpBatchedDisabledInstance(_clusterConfig.getRecord())
+          && cleanBatchDisableMapField(accessor)) {
+        LogUtil.logInfo(logger, getClusterEventId(), String
+            .format("Clean ClusterConfig mapField for cluster %s, pipeline %s", _clusterName,
+                getPipelineName()));
+      }
       refreshAbnormalStateResolverMap(_clusterConfig);
     } else {
-      LogUtil.logInfo(logger, getClusterEventId(), String.format(
-          "No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
+      LogUtil.logDebug(logger, getClusterEventId(), String
+          .format("No ClusterConfig change for cluster %s, pipeline %s", _clusterName,
+              getPipelineName()));
     }
   }
 
+  // TODO: This function is used to clean up batched disabled instances for
+  // "DISABLED_INSTANCES" introduced in 1.0.3.0. This temp change should be reverted after 1.0.5.0 \
+  // or later version.
+  private boolean cleanBatchDisableMapField(final HelixDataAccessor accessor) {
+    boolean successful =
+        accessor.updateProperty(accessor.keyBuilder().clusterConfig(), new DataUpdater<ZNRecord>() {
+          @Override
+          public ZNRecord update(ZNRecord currentData) {
+            if (currentData == null) {
+              throw new HelixException(
+                  "Cluster: " + _clusterConfig.getClusterName() + ": cluster config is null");
+            }
+            ZNRecord newRecord = new ZNRecord(currentData);
+            String batchDisabledInstanceMapFieldKey =
+                ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name();
+            if (needCleanUpBatchedDisabledInstance(currentData)) {
+              newRecord.getMapFields().remove(batchDisabledInstanceMapFieldKey);
+            }
+            return newRecord;
+          }
+        }, null);
+    if (!successful) {
+      LogUtil.logError(logger, getClusterEventId(), String
+          .format("Failed to clean ClusterConfig change for cluster %s, pipeline %s", _clusterName,
+              getPipelineName()));
+    }
+    return successful;
+  }
+
+  private boolean needCleanUpBatchedDisabledInstance(ZNRecord record) {
+    return record!=null && record.getMapFields()!=null && record.getMapFields()
+        .containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name());
+  }
+
   private void refreshIdealState(final HelixDataAccessor accessor,
       Set<HelixConstants.ChangeType> refreshedType) {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE).getAndSet(false)) {
       _idealStateCache.refresh(accessor);
       refreshedType.add(HelixConstants.ChangeType.IDEAL_STATE);
     } else {
-      LogUtil.logInfo(logger, getClusterEventId(), String
-          .format("No ideal state change for %s cluster, %s pipeline", _clusterName,
+      LogUtil.logInfo(logger, getClusterEventId(),
+          String.format("No ideal state change for %s cluster, %s pipeline", _clusterName,
               getPipelineName()));
     }
   }
@@ -557,7 +600,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
     Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);
     if (_disabledInstanceForPartitionMap.containsKey(resource)
-        && _disabledInstanceForPartitionMap.get(resource).containsKey(partition)) {
+        && _disabledInstanceForPartitionMap
+        .get(resource).containsKey(partition)) {
       disabledInstancesForPartition
           .addAll(_disabledInstanceForPartitionMap.get(resource).get(partition));
     }
@@ -767,8 +811,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (!_updateInstanceOfflineTime) {
       return;
     }
-    List<String> offlineNodes =
-        new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
+    List<String> offlineNodes = new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
     offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet());
     _instanceOfflineTimeMap = new HashMap<>();
 
@@ -809,8 +852,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
         _disabledInstanceForPartitionMap.putIfAbsent(resource, new HashMap<>());
         for (String partition : disabledPartitionMap.get(resource)) {
           _disabledInstanceForPartitionMap.get(resource)
-              .computeIfAbsent(partition, key -> new HashSet<>())
-              .add(config.getInstanceName());
+              .computeIfAbsent(partition, key -> new HashSet<>()).add(config.getInstanceName());
         }
       }
     }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
index 65c947269..98a8824e2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.integration.task.TaskTestBase;
 import org.apache.helix.integration.task.WorkflowGenerator;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
index 379feb90a..ee9f27720 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
@@ -21,11 +21,14 @@ package org.apache.helix.integration.controller;
 
 import java.lang.management.ManagementFactory;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -35,12 +38,17 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -74,6 +82,51 @@ public class TestControllerLeadershipChange extends ZkTestBase {
   }
 
   @Test
+  public void testControllerCleanUpClusterConfig() {
+    ZkBaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor(_gZkClient);
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, "DISABLED_Instance");
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, "DISABLED_Instance", false);
+
+    baseDataAccessor.update(PropertyPathBuilder.clusterConfig(CLUSTER_NAME),new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + CLUSTER_NAME + ": cluster config is null");
+        }
+
+        ClusterConfig clusterConfig = new ClusterConfig(currentData);
+        Map<String, String> disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances());
+        disabledInstances.put("DISABLED_Instance", "HELIX_ENABLED_DISABLE_TIMESTAMP=1652338376608");
+        clusterConfig.setDisabledInstances(disabledInstances);
+
+        return clusterConfig.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
+    controller.syncStart();
+    verifyControllerIsLeader(controller);
+
+    // Create cluster verifier
+    ZkHelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+            .build();
+
+    // Wait for rebalanced
+    Assert.assertTrue(clusterVerifier.verifyByPolling());
+    ZKHelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, baseDataAccessor);
+    ClusterConfig cls = helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig());
+    Assert.assertFalse(cls.getRecord().getMapFields()
+        .containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));
+
+    controller.syncStop();
+    verifyControllerIsNotLeader(controller);
+    verifyZKDisconnected(controller);
+  }
+
+    @Test
   public void testControllerConnectThenDisconnect() {
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
@@ -182,6 +235,7 @@ public class TestControllerLeadershipChange extends ZkTestBase {
     Assert.assertTrue(controller.getZkClient().isClosed());
   }
 
+
   @Test
   public void testMissingTopStateDurationMonitoring() throws Exception {
     String clusterName = "testCluster-TestControllerLeadershipChange";
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index ab87864ad..5d2fc7082 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -403,6 +403,11 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
         _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getInstanceDisabledReason(),
         "");
 
+    // We should see no instance disable related field in to clusterConfig
+    ClusterConfig cls = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    Assert.assertFalse(cls.getRecord().getMapFields()
+        .containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));
+
     // disable instance with no reason input
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=disable")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
@@ -415,6 +420,11 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
     Assert.assertTrue(
         _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getInstanceEnabled());
 
+    // Disable instance should see no field write to clusterConfig
+    cls = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    Assert.assertFalse(cls.getRecord().getMapFields()
+        .containsKey(ClusterConfig.ClusterConfigProperty.DISABLED_INSTANCES.name()));
+
     // AddTags
     List<String> tagList = ImmutableList.of("tag3", "tag1", "tag2");
     entity = Entity.entity(