You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/06/24 16:09:52 UTC

[helix] branch master updated: implement util for cloud event (#2149)

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new dadaba13c implement util for cloud event (#2149)
dadaba13c is described below

commit dadaba13c3717ffbad619a9f4a064ed987ea2454
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 24 09:09:47 2022 -0700

    implement util for cloud event (#2149)
    
    This change add implementation for HelixCloudEventUtil.
---
 .../event/helix/DefaultCloudEventCallbackImpl.java |  77 +++++++++++---
 .../cloud/event/helix/HelixCloudEventListener.java |   4 +-
 .../cloud/event/helix/HelixEventHandlingUtil.java  | 111 +++++++++++++++++----
 .../event/TestDefaultCloudEventCallbackImpl.java   | 111 +++++++++++++++++++++
 4 files changed, 266 insertions(+), 37 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index d9aa3b2d1..04ad4b798 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -19,51 +19,102 @@ package org.apache.helix.cloud.event.helix;
  * under the License.
  */
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * A default callback implementation class to be used in {@link HelixCloudEventListener}
  */
 public class DefaultCloudEventCallbackImpl {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCloudEventCallbackImpl.class);
+  private final String _instanceReason = "Cloud event in DefaultCloudEventCallback at %s";
+  private final String _emmReason = "Cloud event EMM in DefaultCloudEventCallback by %s at %s";
 
   /**
-   * Disable the instance
+   * Disable the instance and track the cloud event in map field disabledInstancesWithInfo in
+   * cluster config. Will not re-disable the instance if the instance is already disabled for
+   * other reason. (So we will not overwrite the disabled reason and enable this instance when
+   * on-unpause)
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void disableInstance(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    String message = String.format(_instanceReason, System.currentTimeMillis());
+    LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", manager.getInstanceName());
+    if (InstanceValidationUtil
+        .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
+      manager.getClusterManagmentTool()
+          .enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
+              InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+    }
+    HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
+        manager.getInstanceName(), manager.getHelixDataAccessor().getBaseDataAccessor(), false,
+        message);
   }
 
   /**
-   * Enable the instance
+   * Remove tracked cloud event in cluster config and enable the instance
+   * We only enable instance that is disabled because of cloud event.
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void enableInstance(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    LOG.info("DefaultCloudEventCallbackImpl enable Instance {}", manager.getInstanceName());
+    String instanceName = manager.getInstanceName();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    String message = String.format(_instanceReason, System.currentTimeMillis());
+    HelixEventHandlingUtil
+        .updateCloudEventOperationInClusterConfig(manager.getClusterName(), instanceName,
+            manager.getHelixDataAccessor().getBaseDataAccessor(), true, message);
+    if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, accessor)) {
+      manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, true,
+          InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+    }
   }
 
   /**
-   *
+   * Will enter MM when the cluster is not in MM
+   * TODO: we should add maintenance reason when EMM with cloud event
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+      if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
+        LOG.info("DefaultCloudEventCallbackImpl enterMaintenanceMode by {}",
+            manager.getInstanceName());
+        manager.getClusterManagmentTool()
+            .manuallyEnableMaintenanceMode(manager.getClusterName(), true,
+                String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
+                null);
+      }
   }
 
   /**
-   *
+   * Will exit MM when when cluster config tracks no ongoing cloud event being handling
+   * TODO: we should also check the maintenance reason and only exit when EMM is caused by cloud event
    * @param manager The helix manager associated with the listener
    * @param eventInfo Detailed information about the event
    */
   public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
-    // To be implemented
-    throw new NotImplementedException();
+    ClusterConfig clusterConfig = manager.getHelixDataAccessor()
+        .getProperty(manager.getHelixDataAccessor().keyBuilder().clusterConfig());
+    if (HelixEventHandlingUtil.checkNoInstanceUnderCloudEvent(clusterConfig)) {
+      LOG.info("DefaultCloudEventCallbackImpl exitMaintenanceMode by {}",
+          manager.getInstanceName());
+      manager.getClusterManagmentTool()
+          .manuallyEnableMaintenanceMode(manager.getClusterName(), false,
+              String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
+              null);
+    } else {
+      LOG.info(
+          "DefaultCloudEventCallbackImpl will not exitMaintenanceMode as there are {} instances under cloud event",
+          clusterConfig.getDisabledInstancesWithInfo().keySet().size());
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
index 57d5a43b3..57e5eb1ee 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
@@ -107,9 +107,9 @@ public class HelixCloudEventListener implements CloudEventListener {
     try {
       LOG.info("Loading class: " + implClassName);
       implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
-          .newInstance();
+          .getConstructor().newInstance();
     } catch (Exception e) {
-      implClass = DefaultCloudEventCallbackImpl.class.newInstance();
+      implClass = new DefaultCloudEventCallbackImpl();
       LOG.error(
           "No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
           implClassName, e.getMessage());
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
index 7a17c3f2e..ee96a13ee 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
@@ -19,40 +19,107 @@ package org.apache.helix.cloud.event.helix;
  * under the License.
  */
 
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.ConfigStringUtil;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
- class HelixEventHandlingUtil {
+class HelixEventHandlingUtil {
+  private static Logger LOG = LoggerFactory.getLogger(HelixEventHandlingUtil.class);
 
   /**
-   * Enable or disable an instance for cloud event.
-   * It will enable/disable Helix for that instance. Also add the instance cloud event info to
-   * clusterConfig Znode when enable.
-   * @param clusterName
+   * check if instance is disabled by cloud event.
    * @param instanceName
-   * @param message
-   * @param isEnable
    * @param dataAccessor
-   * @return return failure when either enable/disable failed or update cluster ZNode failed.
+   * @return return true only when instance is Helix disabled and the disabled reason in
+   * instanceConfig is cloudEvent
    */
-   static boolean enableInstanceForCloudEvent(String clusterName, String instanceName, String message,
-      boolean isEnable, BaseDataAccessor dataAccessor) {
-    // TODO add impl here
-    return true;
+  static boolean isInstanceDisabledForCloudEvent(String instanceName,
+      HelixDataAccessor dataAccessor) {
+    InstanceConfig instanceConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceName));
+    if (instanceConfig == null) {
+      throw new HelixException("Instance: " + instanceName
+          + ", instance config does not exist");
+    }
+    return !InstanceValidationUtil.isEnabled(dataAccessor, instanceName) && instanceConfig
+        .getInstanceDisabledType()
+        .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
   }
 
   /**
-   * check if instance is disabled by cloud event.
-   * @param clusterName
-   * @param instanceName
-   * @param dataAccessor
-   * @return return true only when instance is Helix disabled and has the cloud event info in
-   * clusterConfig ZNode.
+   * Update map field disabledInstancesWithInfo in clusterConfig with cloudEvent instance info
    */
-   static boolean IsInstanceDisabledForCloudEvent(String clusterName, String instanceName,
-      BaseDataAccessor dataAccessor) {
-    // TODO add impl here
-    return true;
+  static void updateCloudEventOperationInClusterConfig(String clusterName, String instanceName,
+      BaseDataAccessor baseAccessor, boolean enable, String message) {
+    String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ": cluster config does not exist");
+    }
+
+    if (!baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ": cluster config is null");
+        }
+
+        ClusterConfig clusterConfig = new ClusterConfig(currentData);
+        Map<String, String> disabledInstancesWithInfo =
+            new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo());
+        if (enable) {
+          disabledInstancesWithInfo.keySet().remove(instanceName);
+        } else {
+          // disabledInstancesWithInfo is only used for cloud event handling.
+          String timeStamp = String.valueOf(System.currentTimeMillis());
+          disabledInstancesWithInfo.put(instanceName, ZKHelixAdmin
+              .assembleInstanceBatchedDisabledInfo(
+                  InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message, timeStamp));
+        }
+        clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);
+
+        return clusterConfig.getRecord();
+      }
+    }, AccessOption.PERSISTENT)) {
+      LOG.error("Failed to update cluster config {} for {} instance {}. {}", clusterName,
+          enable ? "enable" : "disable", instanceName, message);
+    }
   }
 
+  /**
+   * Return true if no instance is under cloud event handling
+   * @param clusterConfig
+   * @return
+   */
+  static boolean checkNoInstanceUnderCloudEvent(ClusterConfig clusterConfig) {
+    Map<String, String> clusterConfigTrackedEvent = clusterConfig.getDisabledInstancesWithInfo();
+    if (clusterConfigTrackedEvent == null || clusterConfigTrackedEvent.isEmpty()) {
+      return true;
+    }
+
+    for (Map.Entry<String, String> entry : clusterConfigTrackedEvent.entrySet()) {
+      if (ConfigStringUtil.parseConcatenatedConfig(entry.getValue())
+          .get(ClusterConfig.ClusterConfigProperty.HELIX_DISABLED_TYPE.toString())
+          .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
new file mode 100644
index 000000000..bf9b59dc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -0,0 +1,111 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
+  private final DefaultCloudEventCallbackImpl _impl = new DefaultCloudEventCallbackImpl();
+  private MockParticipantManager _instanceManager;
+  private HelixAdmin _admin;
+
+  public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _instanceManager = _participants[0];
+    _admin = _instanceManager.getClusterManagmentTool();
+  }
+
+  @Test
+  public void testDisableInstance() {
+    Assert.assertTrue(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    _impl.disableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    Assert.assertEquals(_manager.getConfigAccessor()
+        .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+        .getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+
+    // Should not disable instance if it is already disabled due to other reasons
+    // And disabled type should remain unchanged
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+    _impl.disableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    Assert.assertEquals(_manager.getConfigAccessor()
+            .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+            .getInstanceDisabledType(),
+        InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
+        InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+  }
+
+  @Test (dependsOnMethods = "testDisableInstance")
+  public void testEnableInstance() {
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    // Should enable instance if the instance is disabled due to cloud event
+    _impl.enableInstance(_instanceManager, null);
+    Assert.assertTrue(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+
+    // Should not enable instance if it is not disabled due to cloud event
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+    _impl.enableInstance(_instanceManager, null);
+    Assert.assertFalse(InstanceValidationUtil
+        .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+    _admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
+        true);
+  }
+
+  @Test
+  public void testEnterMaintenanceMode() {
+    Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+    _impl.enterMaintenanceMode(_instanceManager, null);
+    _impl.disableInstance(_instanceManager, null);
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+  }
+
+  @Test (dependsOnMethods = "testEnterMaintenanceMode")
+  public void testExitMaintenanceMode() {
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+    // Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
+    _impl.exitMaintenanceMode(_instanceManager, null);
+    Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+
+    // Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
+    _impl.enableInstance(_instanceManager, null);
+    _impl.exitMaintenanceMode(_instanceManager, null);
+    Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+  }
+}
\ No newline at end of file