You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/06/24 16:09:52 UTC
[helix] branch master updated: implement util for cloud event (#2149)
This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new dadaba13c implement util for cloud event (#2149)
dadaba13c is described below
commit dadaba13c3717ffbad619a9f4a064ed987ea2454
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 24 09:09:47 2022 -0700
implement util for cloud event (#2149)
This change add implementation for HelixCloudEventUtil.
---
.../event/helix/DefaultCloudEventCallbackImpl.java | 77 +++++++++++---
.../cloud/event/helix/HelixCloudEventListener.java | 4 +-
.../cloud/event/helix/HelixEventHandlingUtil.java | 111 +++++++++++++++++----
.../event/TestDefaultCloudEventCallbackImpl.java | 111 +++++++++++++++++++++
4 files changed, 266 insertions(+), 37 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index d9aa3b2d1..04ad4b798 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -19,51 +19,102 @@ package org.apache.helix.cloud.event.helix;
* under the License.
*/
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A default callback implementation class to be used in {@link HelixCloudEventListener}
*/
public class DefaultCloudEventCallbackImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultCloudEventCallbackImpl.class);
+ private final String _instanceReason = "Cloud event in DefaultCloudEventCallback at %s";
+ private final String _emmReason = "Cloud event EMM in DefaultCloudEventCallback by %s at %s";
/**
- * Disable the instance
+ * Disable the instance and track the cloud event in map field disabledInstancesWithInfo in
+ * cluster config. Will not re-disable the instance if the instance is already disabled for
+ * other reason. (So we will not overwrite the disabled reason and enable this instance when
+ * on-unpause)
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void disableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ String message = String.format(_instanceReason, System.currentTimeMillis());
+ LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", manager.getInstanceName());
+ if (InstanceValidationUtil
+ .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
+ manager.getClusterManagmentTool()
+ .enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+ }
+ HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
+ manager.getInstanceName(), manager.getHelixDataAccessor().getBaseDataAccessor(), false,
+ message);
}
/**
- * Enable the instance
+ * Remove tracked cloud event in cluster config and enable the instance
+ * We only enable instance that is disabled because of cloud event.
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ LOG.info("DefaultCloudEventCallbackImpl enable Instance {}", manager.getInstanceName());
+ String instanceName = manager.getInstanceName();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String message = String.format(_instanceReason, System.currentTimeMillis());
+ HelixEventHandlingUtil
+ .updateCloudEventOperationInClusterConfig(manager.getClusterName(), instanceName,
+ manager.getHelixDataAccessor().getBaseDataAccessor(), true, message);
+ if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, accessor)) {
+ manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, true,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+ }
}
/**
- *
+ * Will enter MM when the cluster is not in MM
+ * TODO: we should add maintenance reason when EMM with cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ if (!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName())) {
+ LOG.info("DefaultCloudEventCallbackImpl enterMaintenanceMode by {}",
+ manager.getInstanceName());
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), true,
+ String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
+ null);
+ }
}
/**
- *
+ * Will exit MM when when cluster config tracks no ongoing cloud event being handling
+ * TODO: we should also check the maintenance reason and only exit when EMM is caused by cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ ClusterConfig clusterConfig = manager.getHelixDataAccessor()
+ .getProperty(manager.getHelixDataAccessor().keyBuilder().clusterConfig());
+ if (HelixEventHandlingUtil.checkNoInstanceUnderCloudEvent(clusterConfig)) {
+ LOG.info("DefaultCloudEventCallbackImpl exitMaintenanceMode by {}",
+ manager.getInstanceName());
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), false,
+ String.format(_emmReason, manager.getInstanceName(), System.currentTimeMillis()),
+ null);
+ } else {
+ LOG.info(
+ "DefaultCloudEventCallbackImpl will not exitMaintenanceMode as there are {} instances under cloud event",
+ clusterConfig.getDisabledInstancesWithInfo().keySet().size());
+ }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
index 57d5a43b3..57e5eb1ee 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
@@ -107,9 +107,9 @@ public class HelixCloudEventListener implements CloudEventListener {
try {
LOG.info("Loading class: " + implClassName);
implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
- .newInstance();
+ .getConstructor().newInstance();
} catch (Exception e) {
- implClass = DefaultCloudEventCallbackImpl.class.newInstance();
+ implClass = new DefaultCloudEventCallbackImpl();
LOG.error(
"No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
implClassName, e.getMessage());
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
index 7a17c3f2e..ee96a13ee 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
@@ -19,40 +19,107 @@ package org.apache.helix.cloud.event.helix;
* under the License.
*/
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.ConfigStringUtil;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- class HelixEventHandlingUtil {
+class HelixEventHandlingUtil {
+ private static Logger LOG = LoggerFactory.getLogger(HelixEventHandlingUtil.class);
/**
- * Enable or disable an instance for cloud event.
- * It will enable/disable Helix for that instance. Also add the instance cloud event info to
- * clusterConfig Znode when enable.
- * @param clusterName
+ * check if instance is disabled by cloud event.
* @param instanceName
- * @param message
- * @param isEnable
* @param dataAccessor
- * @return return failure when either enable/disable failed or update cluster ZNode failed.
+ * @return return true only when instance is Helix disabled and the disabled reason in
+ * instanceConfig is cloudEvent
*/
- static boolean enableInstanceForCloudEvent(String clusterName, String instanceName, String message,
- boolean isEnable, BaseDataAccessor dataAccessor) {
- // TODO add impl here
- return true;
+ static boolean isInstanceDisabledForCloudEvent(String instanceName,
+ HelixDataAccessor dataAccessor) {
+ InstanceConfig instanceConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceName));
+ if (instanceConfig == null) {
+ throw new HelixException("Instance: " + instanceName
+ + ", instance config does not exist");
+ }
+ return !InstanceValidationUtil.isEnabled(dataAccessor, instanceName) && instanceConfig
+ .getInstanceDisabledType()
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
}
/**
- * check if instance is disabled by cloud event.
- * @param clusterName
- * @param instanceName
- * @param dataAccessor
- * @return return true only when instance is Helix disabled and has the cloud event info in
- * clusterConfig ZNode.
+ * Update map field disabledInstancesWithInfo in clusterConfig with cloudEvent instance info
*/
- static boolean IsInstanceDisabledForCloudEvent(String clusterName, String instanceName,
- BaseDataAccessor dataAccessor) {
- // TODO add impl here
- return true;
+ static void updateCloudEventOperationInClusterConfig(String clusterName, String instanceName,
+ BaseDataAccessor baseAccessor, boolean enable, String message) {
+ String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ": cluster config does not exist");
+ }
+
+ if (!baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ": cluster config is null");
+ }
+
+ ClusterConfig clusterConfig = new ClusterConfig(currentData);
+ Map<String, String> disabledInstancesWithInfo =
+ new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo());
+ if (enable) {
+ disabledInstancesWithInfo.keySet().remove(instanceName);
+ } else {
+ // disabledInstancesWithInfo is only used for cloud event handling.
+ String timeStamp = String.valueOf(System.currentTimeMillis());
+ disabledInstancesWithInfo.put(instanceName, ZKHelixAdmin
+ .assembleInstanceBatchedDisabledInfo(
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message, timeStamp));
+ }
+ clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);
+
+ return clusterConfig.getRecord();
+ }
+ }, AccessOption.PERSISTENT)) {
+ LOG.error("Failed to update cluster config {} for {} instance {}. {}", clusterName,
+ enable ? "enable" : "disable", instanceName, message);
+ }
}
+ /**
+ * Return true if no instance is under cloud event handling
+ * @param clusterConfig
+ * @return
+ */
+ static boolean checkNoInstanceUnderCloudEvent(ClusterConfig clusterConfig) {
+ Map<String, String> clusterConfigTrackedEvent = clusterConfig.getDisabledInstancesWithInfo();
+ if (clusterConfigTrackedEvent == null || clusterConfigTrackedEvent.isEmpty()) {
+ return true;
+ }
+
+ for (Map.Entry<String, String> entry : clusterConfigTrackedEvent.entrySet()) {
+ if (ConfigStringUtil.parseConcatenatedConfig(entry.getValue())
+ .get(ClusterConfig.ClusterConfigProperty.HELIX_DISABLED_TYPE.toString())
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
new file mode 100644
index 000000000..bf9b59dc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -0,0 +1,111 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
+ private final DefaultCloudEventCallbackImpl _impl = new DefaultCloudEventCallbackImpl();
+ private MockParticipantManager _instanceManager;
+ private HelixAdmin _admin;
+
+ public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException, InstantiationException {
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _instanceManager = _participants[0];
+ _admin = _instanceManager.getClusterManagmentTool();
+ }
+
+ @Test
+ public void testDisableInstance() {
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ Assert.assertEquals(_manager.getConfigAccessor()
+ .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+ .getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+
+ // Should not disable instance if it is already disabled due to other reasons
+ // And disabled type should remain unchanged
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ Assert.assertEquals(_manager.getConfigAccessor()
+ .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+ .getInstanceDisabledType(),
+ InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+ }
+
+ @Test (dependsOnMethods = "testDisableInstance")
+ public void testEnableInstance() {
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ // Should enable instance if the instance is disabled due to cloud event
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+
+ // Should not enable instance if it is not disabled due to cloud event
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false);
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName()));
+ _admin.enableInstance(_instanceManager.getClusterName(), _instanceManager.getInstanceName(),
+ true);
+ }
+
+ @Test
+ public void testEnterMaintenanceMode() {
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ _impl.enterMaintenanceMode(_instanceManager, null);
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+
+ @Test (dependsOnMethods = "testEnterMaintenanceMode")
+ public void testExitMaintenanceMode() {
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ // Should not exit maintenance mode if there is remaining live instance that is disabled due to cloud event
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+
+ // Should exit maintenance mode if there is no remaining live instance that is disabled due to cloud event
+ _impl.enableInstance(_instanceManager, null);
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+}
\ No newline at end of file