You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/03/24 16:51:16 UTC

[helix] 02/02: Enable HelixManager as an event listener (#1978)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-vm-freeze
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3cf41231e1108f2d6250336177eb3851ed53f5fb
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 21 13:31:35 2022 -0700

    Enable HelixManager as an event listener (#1978)
    
    Make helix manager cloud event aware by registering a cloud event listener when connect Helix manager
---
 .../java/org/apache/helix/HelixCloudProperty.java  |  21 ++
 .../helix/cloud/event/CloudEventHandler.java       |  32 +--
 .../helix/cloud/event/CloudEventListener.java      |  24 ++-
 .../event/helix/CloudEventCallbackProperty.java    | 138 ++++++++++++
 .../event/helix/DefaultCloudEventCallbackImpl.java |  69 ++++++
 .../cloud/event/helix/HelixCloudEventListener.java | 121 +++++++++++
 .../apache/helix/manager/zk/ZKHelixManager.java    |  32 +++
 .../cloud/event/MockCloudEventCallbackImpl.java    |  61 ++++++
 .../event/TestCloudEventCallbackProperty.java      | 239 +++++++++++++++++++++
 9 files changed, 702 insertions(+), 35 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
index 6cbc2e1..9af7175 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
 import org.apache.helix.model.CloudConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,10 @@ public class HelixCloudProperty {
   // Other customized properties that may be used.
   private Properties _customizedCloudProperties = new Properties();
 
+  private boolean _isCloudEventCallbackEnabled;
+
+  private CloudEventCallbackProperty _cloudEventCallbackProperty;
+
   /**
    * Initialize Helix Cloud Property based on the provider
    * @param
@@ -180,4 +185,20 @@ public class HelixCloudProperty {
   public void setCustomizedCloudProperties(Properties customizedCloudProperties) {
     _customizedCloudProperties.putAll(customizedCloudProperties);
   }
+
+  public boolean isCloudEventCallbackEnabled() {
+    return _isCloudEventCallbackEnabled;
+  }
+
+  public void setCloudEventCallbackEnabled(boolean enabled) {
+    _isCloudEventCallbackEnabled = enabled;
+  }
+
+  public CloudEventCallbackProperty getCloudEventCallbackProperty() {
+    return _cloudEventCallbackProperty;
+  }
+
+  public void setCloudEventCallbackProperty(CloudEventCallbackProperty cloudEventCallbackProperty) {
+    _cloudEventCallbackProperty = cloudEventCallbackProperty;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
index 295194b..5dcbe16 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 public class CloudEventHandler {
   private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName());
   private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>();
-  private Optional<CloudEventListener> _preEventHandlerCallback;
-  private Optional<CloudEventListener> _postEventHandlerCallback;
+  private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty();
+  private Optional<CloudEventListener> _postEventHandlerCallback = Optional.empty();
 
   /**
    * Register an event listener to the event handler.
@@ -70,28 +70,12 @@ public class CloudEventHandler {
   }
 
   /**
-   * Trigger the callback / listeners in order of
-   * 1. PreEventHandlerCallback
-   * 2. Unordered CloudEventListener list
-   * 3. PostEventHandlerCallback
-   * @param eventInfo the object contains any information about the incoming event
+   * Trigger the registered listeners in order,
+   * and trigger the corresponding callback registered in the listeners for a certain type of event.
    */
-  public void onPause(Object eventInfo) {
-    _preEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo));
-    _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onPause(eventInfo));
-    _postEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo));
-  }
-
-  /**
-   * Trigger the callback / listeners in order of
-   * 1. PreEventHandlerCallback
-   * 2. Unordered CloudEventListener list
-   * 3. PostEventHandlerCallback
-   * @param eventInfo the object contains any information about the incoming event
-   */
-  public void onResume(Object eventInfo) {
-    _preEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo));
-    _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onResume(eventInfo));
-    _postEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo));
+  public void performAction(Object eventType, Object eventInfo) {
+    _preEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo));
+    _unorderedEventListenerList.parallelStream().forEach(listener -> listener.performAction(eventType, eventInfo));
+    _postEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
index b4adcc3..5e75337 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
@@ -24,25 +24,27 @@ package org.apache.helix.cloud.event;
  * The listeners can be registered to {@link CloudEventHandler}.
  */
 public interface CloudEventListener {
-  enum ListenerType {
-    PRE_EVENT_HANDLER, UNORDERED, POST_EVENT_HANDLER
-  }
-
   /**
-   * Callback for when a pause event is incoming
-   * @param eventInfo the info of the incoming event
+   * Defines different listener types
+   * It determines the position this listener being triggered in {@link CloudEventHandler}
+   * Order being: PRE_EVENT_HANDLER -> UNORDERED (in parallel) -> POST_EVENT_HANDLER
    */
-  void onPause(Object eventInfo);
+  enum ListenerType {
+    PRE_EVENT_HANDLER,
+    UNORDERED,
+    POST_EVENT_HANDLER
+  }
 
   /**
-   * Callback for when a pause event finishes
-   * @param eventInfo the info of the incoming event
+   * Perform action to react the the event
+   * @param eventType Type of the event
+   * @param eventInfo Detailed information about the event
    */
-  void onResume(Object eventInfo);
+  void performAction(Object eventType, Object eventInfo);
 
   /**
    * Get the listener type of a listener
-   * @return the type of the listener
+   * @return The type of the listener
    */
   ListenerType getListenerType();
 }
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
new file mode 100644
index 0000000..1888eb6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
@@ -0,0 +1,138 @@
+package org.apache.helix.cloud.event.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.helix.HelixManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A property for users to customize the behavior of a Helix manager as a cloud event listener
+ */
+public class CloudEventCallbackProperty {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CloudEventCallbackProperty.class.getName());
+
+  private Set<HelixOperation> _enabledHelixOperation;
+  private Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> _userDefinedCallbackMap;
+  private final Map<String, String> _userArgs;
+
+  /**
+   * Constructor
+   * @param userArgs A map contains information that users pass in
+   */
+  public CloudEventCallbackProperty(Map<String, String> userArgs) {
+    _enabledHelixOperation = new HashSet<>();
+    _userDefinedCallbackMap = new HashMap<>();
+    _userArgs = userArgs;
+  }
+
+  /**
+   * Keys for retrieve information from the map users pass in
+   */
+  public static class UserArgsInputKey {
+    public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName";
+  }
+
+  /**
+   * A collection of types of Helix operations
+   */
+  public enum HelixOperation {
+    ENABLE_DISABLE_INSTANCE,
+    MAINTENANCE_MODE
+  }
+
+  /**
+   * A collection of types and positions for user to plug in customized callback
+   */
+  public enum UserDefinedCallbackType {
+    PRE_ON_PAUSE,
+    POST_ON_PAUSE,
+    PRE_ON_RESUME,
+    POST_ON_RESUME,
+  }
+
+  /**
+   * Enable an Helix-supported operation
+   * The operation is implemented in the callback impl class
+   * @param operation operation type
+   */
+  public void setHelixOperationEnabled(HelixOperation operation, boolean enabled) {
+    if (enabled) {
+      _enabledHelixOperation.add(operation);
+    } else {
+      _enabledHelixOperation.remove(operation);
+    }
+  }
+
+  /**
+   * Register a user defined callback at a user specified position
+   * The position is relative to Helix operations
+   * There are two options for each type (onPause or onResume):
+   * 1. PRE: The user defined callback will be the first callback being called in the listener
+   * 2. POST: The user defined callback will be the last callback being called in the listener
+   * @param callbackType The type and position for registering the callback
+   * @param callback The implementation of the callback
+   */
+  public void registerUserDefinedCallback(UserDefinedCallbackType callbackType,
+      BiConsumer<HelixManager, Object> callback) {
+    LOG.info("Registering callback {} as {} type user defined callback...", callback,
+        callbackType.name());
+    _userDefinedCallbackMap.put(callbackType, callback);
+  }
+
+  /**
+   * Unregister a user defined callback at a user specified position
+   * @param callbackType The type and position for registering the callback
+   */
+  public void unregisterUserDefinedCallback(UserDefinedCallbackType callbackType) {
+    LOG.info("Unregistering {} type user defined callback...", callbackType.name());
+    _userDefinedCallbackMap.remove(callbackType);
+  }
+
+  /**
+   * Get the user passed-in information
+   * @return Empty map if not defined; an unmodified map otherwise
+   */
+  public Map<String, String> getUserArgs() {
+    return _userArgs == null ? Collections.emptyMap() : Collections.unmodifiableMap(_userArgs);
+  }
+
+  /**
+   * Get the map where user defined callbacks are stored
+   */
+  public Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> getUserDefinedCallbackMap() {
+    return _userDefinedCallbackMap;
+  }
+
+  /**
+   * Get the set where enabled Helix operations are stored
+   */
+  public Set<HelixOperation> getEnabledHelixOperation() {
+    return _enabledHelixOperation;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
new file mode 100644
index 0000000..d9aa3b2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -0,0 +1,69 @@
+package org.apache.helix.cloud.event.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * A default callback implementation class to be used in {@link HelixCloudEventListener}
+ */
+public class DefaultCloudEventCallbackImpl {
+
+  /**
+   * Disable the instance
+   * @param manager The helix manager associated with the listener
+   * @param eventInfo Detailed information about the event
+   */
+  public void disableInstance(HelixManager manager, Object eventInfo) {
+    // To be implemented
+    throw new NotImplementedException();
+  }
+
+  /**
+   * Enable the instance
+   * @param manager The helix manager associated with the listener
+   * @param eventInfo Detailed information about the event
+   */
+  public void enableInstance(HelixManager manager, Object eventInfo) {
+    // To be implemented
+    throw new NotImplementedException();
+  }
+
+  /**
+   *
+   * @param manager The helix manager associated with the listener
+   * @param eventInfo Detailed information about the event
+   */
+  public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
+    // To be implemented
+    throw new NotImplementedException();
+  }
+
+  /**
+   *
+   * @param manager The helix manager associated with the listener
+   * @param eventInfo Detailed information about the event
+   */
+  public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
+    // To be implemented
+    throw new NotImplementedException();
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
new file mode 100644
index 0000000..57d5a43
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
@@ -0,0 +1,121 @@
+package org.apache.helix.cloud.event.helix;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.cloud.event.CloudEventListener;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation;
+import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType;
+
+/**
+ * A helix manager-based cloud event listener implementation
+ */
+public class HelixCloudEventListener implements CloudEventListener {
+  private static Logger LOG = LoggerFactory.getLogger(HelixCloudEventListener.class);
+
+  private final CloudEventCallbackProperty _property;
+  private final DefaultCloudEventCallbackImpl _callbackImplClass;
+  private final HelixManager _helixManager;
+
+  public HelixCloudEventListener(CloudEventCallbackProperty property, HelixManager helixManager)
+      throws InstantiationException, IllegalAccessException {
+    this._property = property;
+    this._helixManager = helixManager;
+    this._callbackImplClass = loadCloudEventCallbackImplClass(property.getUserArgs()
+        .getOrDefault(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+            DefaultCloudEventCallbackImpl.class.getCanonicalName()));
+  }
+
+  /**
+   * The type of incoming event
+   */
+  public enum EventType {
+    ON_PAUSE, ON_RESUME
+  }
+
+  /**
+   * Below are lists defining the type and sequence of callbacks for each type of events
+   */
+  private final List<Object> onPauseOperations = Arrays
+      .asList(UserDefinedCallbackType.PRE_ON_PAUSE, HelixOperation.MAINTENANCE_MODE,
+          HelixOperation.ENABLE_DISABLE_INSTANCE, UserDefinedCallbackType.POST_ON_PAUSE);
+  private final List<Object> onResumeOperations = Arrays
+      .asList(UserDefinedCallbackType.PRE_ON_RESUME, HelixOperation.ENABLE_DISABLE_INSTANCE,
+          HelixOperation.MAINTENANCE_MODE, UserDefinedCallbackType.POST_ON_RESUME);
+
+  @Override
+  public void performAction(Object eventType, Object eventInfo) {
+    LOG.info("Received {} event, event info {}, timestamp {}. Acting on the event... "
+            + "Actor {}, based on callback implementation class {}.", ((EventType) eventType).name(),
+        eventInfo == null ? "N/A" : eventInfo.toString(), System.currentTimeMillis(), _helixManager,
+        _callbackImplClass.getClass().getCanonicalName());
+
+    if (eventType == EventType.ON_PAUSE) {
+      onPauseOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
+    } else if (eventType == EventType.ON_RESUME) {
+      onResumeOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
+    }
+  }
+
+  private void executeOperation(Object eventType, Object eventInfo, Object operation) {
+    Set<CloudEventCallbackProperty.HelixOperation> enabledHelixOperationSet =
+        _property.getEnabledHelixOperation();
+    if (HelixOperation.ENABLE_DISABLE_INSTANCE.equals(operation)) {
+      if (enabledHelixOperationSet.contains(HelixOperation.ENABLE_DISABLE_INSTANCE)) {
+        if (eventType == EventType.ON_PAUSE) {
+          _callbackImplClass.disableInstance(_helixManager, eventInfo);
+        } else {
+          _callbackImplClass.enableInstance(_helixManager, eventInfo);
+        }
+      }
+    } else if (HelixOperation.MAINTENANCE_MODE.equals(operation)) {
+      if (enabledHelixOperationSet.contains(HelixOperation.MAINTENANCE_MODE)) {
+        if (eventType == EventType.ON_PAUSE) {
+          _callbackImplClass.enterMaintenanceMode(_helixManager, eventInfo);
+        } else {
+          _callbackImplClass.exitMaintenanceMode(_helixManager, eventInfo);
+        }
+      }
+    } else if (operation instanceof UserDefinedCallbackType) {
+      BiConsumer<HelixManager, Object> callback =
+          _property.getUserDefinedCallbackMap().get(operation);
+      if (callback != null) {
+        callback.accept(_helixManager, eventInfo);
+      }
+    } else {
+      // Should not reach here
+      throw new HelixException("Unknown category of cloud event operation " + operation.toString());
+    }
+  }
+
+  @Override
+  public CloudEventListener.ListenerType getListenerType() {
+    return CloudEventListener.ListenerType.UNORDERED;
+  }
+
+  private DefaultCloudEventCallbackImpl loadCloudEventCallbackImplClass(String implClassName)
+      throws IllegalAccessException, InstantiationException {
+    DefaultCloudEventCallbackImpl implClass;
+    try {
+      LOG.info("Loading class: " + implClassName);
+      implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
+          .newInstance();
+    } catch (Exception e) {
+      implClass = DefaultCloudEventCallbackImpl.class.newInstance();
+      LOG.error(
+          "No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
+          implClassName, e.getMessage());
+    }
+    LOG.info("Using {} as cloud event callback impl class.",
+        implClass.getClass().getCanonicalName());
+    return implClass;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d7a3472..22f0330 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -38,6 +38,7 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -70,6 +71,10 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.cloud.event.CloudEventHandlerFactory;
+import org.apache.helix.cloud.event.CloudEventListener;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
+import org.apache.helix.cloud.event.helix.HelixCloudEventListener;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -175,6 +180,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>();
 
   /**
+   * Cloud fields
+   */
+  private CloudEventListener _cloudEventListener;
+
+  /**
    * status dump timer-task
    */
   protected static class StatusDumpTask extends HelixTimerTask {
@@ -816,6 +826,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       }
       throw e;
     }
+
+    if (_helixManagerProperty != null) {
+      HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
+      if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
+        _cloudEventListener =
+            new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
+        CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+      }
+    }
   }
 
   @Override
@@ -861,6 +880,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
       _helixPropertyStore = null;
 
+      if (_cloudEventListener != null) {
+        CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+        _cloudEventListener = null;
+      }
+
       synchronized (this) {
         if (_controller != null) {
           _controller = null;
@@ -1446,6 +1470,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     return _sessionStartTime;
   }
 
+  private CloudEventCallbackProperty getCloudEventListenerCallbackProperty() {
+    HelixCloudProperty cloudProperty = _helixManagerProperty.getHelixCloudProperty();
+    if (cloudProperty == null || !cloudProperty.isCloudEventCallbackEnabled()) {
+      return null;
+    }
+    return cloudProperty.getCloudEventCallbackProperty();
+  }
+
   /*
    * Prepares connection config and client config based on the internal parameters given to
    * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java
new file mode 100644
index 0000000..60bb12f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java
@@ -0,0 +1,61 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+
+public class MockCloudEventCallbackImpl extends DefaultCloudEventCallbackImpl {
+  public enum OperationType {
+    ON_PAUSE_DISABLE_INSTANCE,
+    ON_RESUME_ENABLE_INSTANCE,
+    ON_PAUSE_MAINTENANCE_MODE,
+    ON_RESUME_MAINTENANCE_MODE,
+    PRE_ON_PAUSE,
+    POST_ON_PAUSE,
+    PRE_ON_RESUME,
+    POST_ON_RESUME
+  }
+
+  public static Set<OperationType> triggeredOperation = new HashSet<>();
+
+  @Override
+  public void disableInstance(HelixManager manager, Object eventInfo) {
+    triggeredOperation.add(OperationType.ON_PAUSE_DISABLE_INSTANCE);
+  }
+
+  @Override
+  public void enableInstance(HelixManager manager, Object eventInfo) {
+    triggeredOperation.add(OperationType.ON_RESUME_ENABLE_INSTANCE);
+  }
+
+  @Override
+  public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
+    triggeredOperation.add(OperationType.ON_PAUSE_MAINTENANCE_MODE);
+  }
+
+  @Override
+  public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
+    triggeredOperation.add(OperationType.ON_RESUME_MAINTENANCE_MODE);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
new file mode 100644
index 0000000..70d9e1b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
@@ -0,0 +1,239 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType;
+import org.apache.helix.cloud.event.helix.HelixCloudEventListener;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.CloudConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestCloudEventCallbackProperty {
+  private HelixManager _helixManager;
+  private HelixCloudProperty _cloudProperty;
+  private final static String CLUSTER_NAME = "testCluster";
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Set up Helix manager property: Helix Cloud Property
+    _cloudProperty = new HelixCloudProperty(new CloudConfig(new ZNRecord(CLUSTER_NAME)));
+    _cloudProperty.setCloudEventCallbackEnabled(true);
+    HelixManagerProperty.Builder managerPropertyBuilder = new HelixManagerProperty.Builder();
+    managerPropertyBuilder.setHelixCloudProperty(_cloudProperty);
+
+    // Build Helix manager property
+    HelixManagerProperty managerProperty = managerPropertyBuilder.build();
+
+    // Create Helix Manager
+    _helixManager =
+        new MockEventAwareZKHelixManager(CLUSTER_NAME, "instanceName", InstanceType.PARTICIPANT,
+            null, null, managerProperty);
+  }
+
+  @AfterTest
+  public void afterTest() {
+    _helixManager.disconnect();
+    _cloudProperty.getCloudEventCallbackProperty()
+        .setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, false);
+    _cloudProperty.getCloudEventCallbackProperty()
+        .setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, false);
+    _cloudProperty.getCloudEventCallbackProperty()
+        .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE);
+    _cloudProperty.getCloudEventCallbackProperty()
+        .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE);
+    _cloudProperty.getCloudEventCallbackProperty()
+        .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME);
+    _cloudProperty.getCloudEventCallbackProperty()
+        .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME);
+    MockCloudEventCallbackImpl.triggeredOperation.clear();
+  }
+
+  @Test
+  public void testOptionalHelixOperation() throws Exception {
+    // Cloud event callback property
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+        .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+            MockCloudEventCallbackImpl.class.getCanonicalName()));
+    property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
+    _cloudProperty.setCloudEventCallbackProperty(property);
+
+    _helixManager.connect();
+
+    // Manually trigger event
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+    Assert.assertTrue(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+
+    property.setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, true);
+
+    MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+    // Manually trigger event
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+    Assert.assertTrue(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+    Assert.assertTrue(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+
+    MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+    // Manually trigger event
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+    Assert.assertFalse(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+    Assert.assertTrue(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+    Assert.assertTrue(
+        callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+  }
+
+  @Test
+  public void testUserDefinedCallback() throws Exception {
+    afterTest();
+    // Cloud event callback property
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+        .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+            MockCloudEventCallbackImpl.class.getCanonicalName()));
+    _cloudProperty.setCloudEventCallbackProperty(property);
+
+    _helixManager.connect();
+
+    property
+        .registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE, (manager, eventInfo) -> {
+          MockCloudEventCallbackImpl.triggeredOperation
+              .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE);
+        });
+    property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE,
+        (manager, eventInfo) -> {
+          MockCloudEventCallbackImpl.triggeredOperation
+              .add(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE);
+        });
+    property.registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME,
+        (manager, eventInfo) -> {
+          MockCloudEventCallbackImpl.triggeredOperation
+              .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME);
+        });
+    property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME,
+        (manager, eventInfo) -> {
+          MockCloudEventCallbackImpl.triggeredOperation
+              .add(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME);
+        });
+
+    // Manually trigger event
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+    Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
+    Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
+    Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME));
+    Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME));
+
+    MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
+    Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
+    Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
+    Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME));
+    Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME));
+  }
+
+  @Test
+  public void testUsingInvalidImplClassName() throws Exception {
+    // Cloud event callback property
+    CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+        .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+            "org.apache.helix.cloud.InvalidClassName"));
+    _cloudProperty.setCloudEventCallbackProperty(property);
+
+    _helixManager.connect();
+
+    // Manually trigger event
+    CloudEventHandlerFactory.getInstance()
+        .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+  }
+
+  private boolean callbackTriggered(MockCloudEventCallbackImpl.OperationType type) {
+    return MockCloudEventCallbackImpl.triggeredOperation.contains(type);
+  }
+
+  public static class MockEventAwareZKHelixManager extends ZKHelixManager {
+    private final HelixManagerProperty _helixManagerProperty;
+    private CloudEventListener _cloudEventListener;
+
+    /**
+     * Use a mock zk helix manager to avoid the need to connect to zk
+     */
+    public MockEventAwareZKHelixManager(String clusterName, String instanceName,
+        InstanceType instanceType, String zkAddress, HelixManagerStateListener stateListener,
+        HelixManagerProperty helixManagerProperty) {
+      super(clusterName, instanceName, instanceType, zkAddress, stateListener,
+          helixManagerProperty);
+      _helixManagerProperty = helixManagerProperty;
+    }
+
+    @Override
+    public void connect() throws IllegalAccessException, InstantiationException {
+      if (_helixManagerProperty != null) {
+        HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
+        if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
+          _cloudEventListener =
+              new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
+          CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+        }
+      }
+    }
+
+    @Override
+    public void disconnect() {
+      if (_cloudEventListener != null) {
+        CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+        _cloudEventListener = null;
+      }
+    }
+  }
+}