You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/03/24 16:51:16 UTC
[helix] 02/02: Enable HelixManager as an event listener (#1978)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-vm-freeze
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3cf41231e1108f2d6250336177eb3851ed53f5fb
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 21 13:31:35 2022 -0700
Enable HelixManager as an event listener (#1978)
Make helix manager cloud event aware by registering a cloud event listener when connect Helix manager
---
.../java/org/apache/helix/HelixCloudProperty.java | 21 ++
.../helix/cloud/event/CloudEventHandler.java | 32 +--
.../helix/cloud/event/CloudEventListener.java | 24 ++-
.../event/helix/CloudEventCallbackProperty.java | 138 ++++++++++++
.../event/helix/DefaultCloudEventCallbackImpl.java | 69 ++++++
.../cloud/event/helix/HelixCloudEventListener.java | 121 +++++++++++
.../apache/helix/manager/zk/ZKHelixManager.java | 32 +++
.../cloud/event/MockCloudEventCallbackImpl.java | 61 ++++++
.../event/TestCloudEventCallbackProperty.java | 239 +++++++++++++++++++++
9 files changed, 702 insertions(+), 35 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
index 6cbc2e1..9af7175 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
import org.apache.helix.model.CloudConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,10 @@ public class HelixCloudProperty {
// Other customized properties that may be used.
private Properties _customizedCloudProperties = new Properties();
+ private boolean _isCloudEventCallbackEnabled;
+
+ private CloudEventCallbackProperty _cloudEventCallbackProperty;
+
/**
* Initialize Helix Cloud Property based on the provider
* @param
@@ -180,4 +185,20 @@ public class HelixCloudProperty {
public void setCustomizedCloudProperties(Properties customizedCloudProperties) {
_customizedCloudProperties.putAll(customizedCloudProperties);
}
+
+ public boolean isCloudEventCallbackEnabled() {
+ return _isCloudEventCallbackEnabled;
+ }
+
+ public void setCloudEventCallbackEnabled(boolean enabled) {
+ _isCloudEventCallbackEnabled = enabled;
+ }
+
+ public CloudEventCallbackProperty getCloudEventCallbackProperty() {
+ return _cloudEventCallbackProperty;
+ }
+
+ public void setCloudEventCallbackProperty(CloudEventCallbackProperty cloudEventCallbackProperty) {
+ _cloudEventCallbackProperty = cloudEventCallbackProperty;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
index 295194b..5dcbe16 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
public class CloudEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName());
private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>();
- private Optional<CloudEventListener> _preEventHandlerCallback;
- private Optional<CloudEventListener> _postEventHandlerCallback;
+ private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty();
+ private Optional<CloudEventListener> _postEventHandlerCallback = Optional.empty();
/**
* Register an event listener to the event handler.
@@ -70,28 +70,12 @@ public class CloudEventHandler {
}
/**
- * Trigger the callback / listeners in order of
- * 1. PreEventHandlerCallback
- * 2. Unordered CloudEventListener list
- * 3. PostEventHandlerCallback
- * @param eventInfo the object contains any information about the incoming event
+ * Trigger the registered listeners in order,
+ * and trigger the corresponding callback registered in the listeners for a certain type of event.
*/
- public void onPause(Object eventInfo) {
- _preEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo));
- _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onPause(eventInfo));
- _postEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo));
- }
-
- /**
- * Trigger the callback / listeners in order of
- * 1. PreEventHandlerCallback
- * 2. Unordered CloudEventListener list
- * 3. PostEventHandlerCallback
- * @param eventInfo the object contains any information about the incoming event
- */
- public void onResume(Object eventInfo) {
- _preEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo));
- _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onResume(eventInfo));
- _postEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo));
+ public void performAction(Object eventType, Object eventInfo) {
+ _preEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo));
+ _unorderedEventListenerList.parallelStream().forEach(listener -> listener.performAction(eventType, eventInfo));
+ _postEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
index b4adcc3..5e75337 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java
@@ -24,25 +24,27 @@ package org.apache.helix.cloud.event;
* The listeners can be registered to {@link CloudEventHandler}.
*/
public interface CloudEventListener {
- enum ListenerType {
- PRE_EVENT_HANDLER, UNORDERED, POST_EVENT_HANDLER
- }
-
/**
- * Callback for when a pause event is incoming
- * @param eventInfo the info of the incoming event
+ * Defines different listener types
+ * It determines the position this listener being triggered in {@link CloudEventHandler}
+ * Order being: PRE_EVENT_HANDLER -> UNORDERED (in parallel) -> POST_EVENT_HANDLER
*/
- void onPause(Object eventInfo);
+ enum ListenerType {
+ PRE_EVENT_HANDLER,
+ UNORDERED,
+ POST_EVENT_HANDLER
+ }
/**
- * Callback for when a pause event finishes
- * @param eventInfo the info of the incoming event
+ * Perform action to react the the event
+ * @param eventType Type of the event
+ * @param eventInfo Detailed information about the event
*/
- void onResume(Object eventInfo);
+ void performAction(Object eventType, Object eventInfo);
/**
* Get the listener type of a listener
- * @return the type of the listener
+ * @return The type of the listener
*/
ListenerType getListenerType();
}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
new file mode 100644
index 0000000..1888eb6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java
@@ -0,0 +1,138 @@
+package org.apache.helix.cloud.event.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.helix.HelixManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A property for users to customize the behavior of a Helix manager as a cloud event listener
+ */
+public class CloudEventCallbackProperty {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CloudEventCallbackProperty.class.getName());
+
+ private Set<HelixOperation> _enabledHelixOperation;
+ private Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> _userDefinedCallbackMap;
+ private final Map<String, String> _userArgs;
+
+ /**
+ * Constructor
+ * @param userArgs A map contains information that users pass in
+ */
+ public CloudEventCallbackProperty(Map<String, String> userArgs) {
+ _enabledHelixOperation = new HashSet<>();
+ _userDefinedCallbackMap = new HashMap<>();
+ _userArgs = userArgs;
+ }
+
+ /**
+ * Keys for retrieve information from the map users pass in
+ */
+ public static class UserArgsInputKey {
+ public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName";
+ }
+
+ /**
+ * A collection of types of Helix operations
+ */
+ public enum HelixOperation {
+ ENABLE_DISABLE_INSTANCE,
+ MAINTENANCE_MODE
+ }
+
+ /**
+ * A collection of types and positions for user to plug in customized callback
+ */
+ public enum UserDefinedCallbackType {
+ PRE_ON_PAUSE,
+ POST_ON_PAUSE,
+ PRE_ON_RESUME,
+ POST_ON_RESUME,
+ }
+
+ /**
+ * Enable an Helix-supported operation
+ * The operation is implemented in the callback impl class
+ * @param operation operation type
+ */
+ public void setHelixOperationEnabled(HelixOperation operation, boolean enabled) {
+ if (enabled) {
+ _enabledHelixOperation.add(operation);
+ } else {
+ _enabledHelixOperation.remove(operation);
+ }
+ }
+
+ /**
+ * Register a user defined callback at a user specified position
+ * The position is relative to Helix operations
+ * There are two options for each type (onPause or onResume):
+ * 1. PRE: The user defined callback will be the first callback being called in the listener
+ * 2. POST: The user defined callback will be the last callback being called in the listener
+ * @param callbackType The type and position for registering the callback
+ * @param callback The implementation of the callback
+ */
+ public void registerUserDefinedCallback(UserDefinedCallbackType callbackType,
+ BiConsumer<HelixManager, Object> callback) {
+ LOG.info("Registering callback {} as {} type user defined callback...", callback,
+ callbackType.name());
+ _userDefinedCallbackMap.put(callbackType, callback);
+ }
+
+ /**
+ * Unregister a user defined callback at a user specified position
+ * @param callbackType The type and position for registering the callback
+ */
+ public void unregisterUserDefinedCallback(UserDefinedCallbackType callbackType) {
+ LOG.info("Unregistering {} type user defined callback...", callbackType.name());
+ _userDefinedCallbackMap.remove(callbackType);
+ }
+
+ /**
+ * Get the user passed-in information
+ * @return Empty map if not defined; an unmodified map otherwise
+ */
+ public Map<String, String> getUserArgs() {
+ return _userArgs == null ? Collections.emptyMap() : Collections.unmodifiableMap(_userArgs);
+ }
+
+ /**
+ * Get the map where user defined callbacks are stored
+ */
+ public Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> getUserDefinedCallbackMap() {
+ return _userDefinedCallbackMap;
+ }
+
+ /**
+ * Get the set where enabled Helix operations are stored
+ */
+ public Set<HelixOperation> getEnabledHelixOperation() {
+ return _enabledHelixOperation;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
new file mode 100644
index 0000000..d9aa3b2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -0,0 +1,69 @@
+package org.apache.helix.cloud.event.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * A default callback implementation class to be used in {@link HelixCloudEventListener}
+ */
+public class DefaultCloudEventCallbackImpl {
+
+ /**
+ * Disable the instance
+ * @param manager The helix manager associated with the listener
+ * @param eventInfo Detailed information about the event
+ */
+ public void disableInstance(HelixManager manager, Object eventInfo) {
+ // To be implemented
+ throw new NotImplementedException();
+ }
+
+ /**
+ * Enable the instance
+ * @param manager The helix manager associated with the listener
+ * @param eventInfo Detailed information about the event
+ */
+ public void enableInstance(HelixManager manager, Object eventInfo) {
+ // To be implemented
+ throw new NotImplementedException();
+ }
+
+ /**
+ *
+ * @param manager The helix manager associated with the listener
+ * @param eventInfo Detailed information about the event
+ */
+ public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
+ // To be implemented
+ throw new NotImplementedException();
+ }
+
+ /**
+ *
+ * @param manager The helix manager associated with the listener
+ * @param eventInfo Detailed information about the event
+ */
+ public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
+ // To be implemented
+ throw new NotImplementedException();
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
new file mode 100644
index 0000000..57d5a43
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
@@ -0,0 +1,121 @@
+package org.apache.helix.cloud.event.helix;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.cloud.event.CloudEventListener;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation;
+import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType;
+
+/**
+ * A helix manager-based cloud event listener implementation
+ */
+public class HelixCloudEventListener implements CloudEventListener {
+ private static Logger LOG = LoggerFactory.getLogger(HelixCloudEventListener.class);
+
+ private final CloudEventCallbackProperty _property;
+ private final DefaultCloudEventCallbackImpl _callbackImplClass;
+ private final HelixManager _helixManager;
+
+ public HelixCloudEventListener(CloudEventCallbackProperty property, HelixManager helixManager)
+ throws InstantiationException, IllegalAccessException {
+ this._property = property;
+ this._helixManager = helixManager;
+ this._callbackImplClass = loadCloudEventCallbackImplClass(property.getUserArgs()
+ .getOrDefault(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ DefaultCloudEventCallbackImpl.class.getCanonicalName()));
+ }
+
+ /**
+ * The type of incoming event
+ */
+ public enum EventType {
+ ON_PAUSE, ON_RESUME
+ }
+
+ /**
+ * Below are lists defining the type and sequence of callbacks for each type of events
+ */
+ private final List<Object> onPauseOperations = Arrays
+ .asList(UserDefinedCallbackType.PRE_ON_PAUSE, HelixOperation.MAINTENANCE_MODE,
+ HelixOperation.ENABLE_DISABLE_INSTANCE, UserDefinedCallbackType.POST_ON_PAUSE);
+ private final List<Object> onResumeOperations = Arrays
+ .asList(UserDefinedCallbackType.PRE_ON_RESUME, HelixOperation.ENABLE_DISABLE_INSTANCE,
+ HelixOperation.MAINTENANCE_MODE, UserDefinedCallbackType.POST_ON_RESUME);
+
+ @Override
+ public void performAction(Object eventType, Object eventInfo) {
+ LOG.info("Received {} event, event info {}, timestamp {}. Acting on the event... "
+ + "Actor {}, based on callback implementation class {}.", ((EventType) eventType).name(),
+ eventInfo == null ? "N/A" : eventInfo.toString(), System.currentTimeMillis(), _helixManager,
+ _callbackImplClass.getClass().getCanonicalName());
+
+ if (eventType == EventType.ON_PAUSE) {
+ onPauseOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
+ } else if (eventType == EventType.ON_RESUME) {
+ onResumeOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation));
+ }
+ }
+
+ private void executeOperation(Object eventType, Object eventInfo, Object operation) {
+ Set<CloudEventCallbackProperty.HelixOperation> enabledHelixOperationSet =
+ _property.getEnabledHelixOperation();
+ if (HelixOperation.ENABLE_DISABLE_INSTANCE.equals(operation)) {
+ if (enabledHelixOperationSet.contains(HelixOperation.ENABLE_DISABLE_INSTANCE)) {
+ if (eventType == EventType.ON_PAUSE) {
+ _callbackImplClass.disableInstance(_helixManager, eventInfo);
+ } else {
+ _callbackImplClass.enableInstance(_helixManager, eventInfo);
+ }
+ }
+ } else if (HelixOperation.MAINTENANCE_MODE.equals(operation)) {
+ if (enabledHelixOperationSet.contains(HelixOperation.MAINTENANCE_MODE)) {
+ if (eventType == EventType.ON_PAUSE) {
+ _callbackImplClass.enterMaintenanceMode(_helixManager, eventInfo);
+ } else {
+ _callbackImplClass.exitMaintenanceMode(_helixManager, eventInfo);
+ }
+ }
+ } else if (operation instanceof UserDefinedCallbackType) {
+ BiConsumer<HelixManager, Object> callback =
+ _property.getUserDefinedCallbackMap().get(operation);
+ if (callback != null) {
+ callback.accept(_helixManager, eventInfo);
+ }
+ } else {
+ // Should not reach here
+ throw new HelixException("Unknown category of cloud event operation " + operation.toString());
+ }
+ }
+
+ @Override
+ public CloudEventListener.ListenerType getListenerType() {
+ return CloudEventListener.ListenerType.UNORDERED;
+ }
+
+ private DefaultCloudEventCallbackImpl loadCloudEventCallbackImplClass(String implClassName)
+ throws IllegalAccessException, InstantiationException {
+ DefaultCloudEventCallbackImpl implClass;
+ try {
+ LOG.info("Loading class: " + implClassName);
+ implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName)
+ .newInstance();
+ } catch (Exception e) {
+ implClass = DefaultCloudEventCallbackImpl.class.newInstance();
+ LOG.error(
+ "No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.",
+ implClassName, e.getMessage());
+ }
+ LOG.info("Using {} as cloud event callback impl class.",
+ implClass.getClass().getCanonicalName());
+ return implClass;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d7a3472..22f0330 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -38,6 +38,7 @@ import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -70,6 +71,10 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.cloud.event.CloudEventHandlerFactory;
+import org.apache.helix.cloud.event.CloudEventListener;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
+import org.apache.helix.cloud.event.helix.HelixCloudEventListener;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -175,6 +180,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>();
/**
+ * Cloud fields
+ */
+ private CloudEventListener _cloudEventListener;
+
+ /**
* status dump timer-task
*/
protected static class StatusDumpTask extends HelixTimerTask {
@@ -816,6 +826,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
throw e;
}
+
+ if (_helixManagerProperty != null) {
+ HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
+ if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
+ _cloudEventListener =
+ new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
+ CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+ }
+ }
}
@Override
@@ -861,6 +880,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_helixPropertyStore = null;
+ if (_cloudEventListener != null) {
+ CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+ _cloudEventListener = null;
+ }
+
synchronized (this) {
if (_controller != null) {
_controller = null;
@@ -1446,6 +1470,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
return _sessionStartTime;
}
+ private CloudEventCallbackProperty getCloudEventListenerCallbackProperty() {
+ HelixCloudProperty cloudProperty = _helixManagerProperty.getHelixCloudProperty();
+ if (cloudProperty == null || !cloudProperty.isCloudEventCallbackEnabled()) {
+ return null;
+ }
+ return cloudProperty.getCloudEventCallbackProperty();
+ }
+
/*
* Prepares connection config and client config based on the internal parameters given to
* HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java
new file mode 100644
index 0000000..60bb12f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java
@@ -0,0 +1,61 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+
+public class MockCloudEventCallbackImpl extends DefaultCloudEventCallbackImpl {
+ public enum OperationType {
+ ON_PAUSE_DISABLE_INSTANCE,
+ ON_RESUME_ENABLE_INSTANCE,
+ ON_PAUSE_MAINTENANCE_MODE,
+ ON_RESUME_MAINTENANCE_MODE,
+ PRE_ON_PAUSE,
+ POST_ON_PAUSE,
+ PRE_ON_RESUME,
+ POST_ON_RESUME
+ }
+
+ public static Set<OperationType> triggeredOperation = new HashSet<>();
+
+ @Override
+ public void disableInstance(HelixManager manager, Object eventInfo) {
+ triggeredOperation.add(OperationType.ON_PAUSE_DISABLE_INSTANCE);
+ }
+
+ @Override
+ public void enableInstance(HelixManager manager, Object eventInfo) {
+ triggeredOperation.add(OperationType.ON_RESUME_ENABLE_INSTANCE);
+ }
+
+ @Override
+ public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
+ triggeredOperation.add(OperationType.ON_PAUSE_MAINTENANCE_MODE);
+ }
+
+ @Override
+ public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
+ triggeredOperation.add(OperationType.ON_RESUME_MAINTENANCE_MODE);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
new file mode 100644
index 0000000..70d9e1b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java
@@ -0,0 +1,239 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation;
+import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType;
+import org.apache.helix.cloud.event.helix.HelixCloudEventListener;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.CloudConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestCloudEventCallbackProperty {
+ private HelixManager _helixManager;
+ private HelixCloudProperty _cloudProperty;
+ private final static String CLUSTER_NAME = "testCluster";
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // Set up Helix manager property: Helix Cloud Property
+ _cloudProperty = new HelixCloudProperty(new CloudConfig(new ZNRecord(CLUSTER_NAME)));
+ _cloudProperty.setCloudEventCallbackEnabled(true);
+ HelixManagerProperty.Builder managerPropertyBuilder = new HelixManagerProperty.Builder();
+ managerPropertyBuilder.setHelixCloudProperty(_cloudProperty);
+
+ // Build Helix manager property
+ HelixManagerProperty managerProperty = managerPropertyBuilder.build();
+
+ // Create Helix Manager
+ _helixManager =
+ new MockEventAwareZKHelixManager(CLUSTER_NAME, "instanceName", InstanceType.PARTICIPANT,
+ null, null, managerProperty);
+ }
+
+ @AfterTest
+ public void afterTest() {
+ _helixManager.disconnect();
+ _cloudProperty.getCloudEventCallbackProperty()
+ .setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, false);
+ _cloudProperty.getCloudEventCallbackProperty()
+ .setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, false);
+ _cloudProperty.getCloudEventCallbackProperty()
+ .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE);
+ _cloudProperty.getCloudEventCallbackProperty()
+ .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE);
+ _cloudProperty.getCloudEventCallbackProperty()
+ .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME);
+ _cloudProperty.getCloudEventCallbackProperty()
+ .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME);
+ MockCloudEventCallbackImpl.triggeredOperation.clear();
+ }
+
+ @Test
+ public void testOptionalHelixOperation() throws Exception {
+ // Cloud event callback property
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+ .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ MockCloudEventCallbackImpl.class.getCanonicalName()));
+ property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true);
+ _cloudProperty.setCloudEventCallbackProperty(property);
+
+ _helixManager.connect();
+
+ // Manually trigger event
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+ Assert.assertTrue(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+
+ property.setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, true);
+
+ MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+ // Manually trigger event
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+ Assert.assertTrue(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+ Assert.assertTrue(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+
+ MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+ // Manually trigger event
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE));
+ Assert.assertFalse(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE));
+ Assert.assertTrue(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE));
+ Assert.assertTrue(
+ callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE));
+ }
+
+ @Test
+ public void testUserDefinedCallback() throws Exception {
+ afterTest();
+ // Cloud event callback property
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+ .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ MockCloudEventCallbackImpl.class.getCanonicalName()));
+ _cloudProperty.setCloudEventCallbackProperty(property);
+
+ _helixManager.connect();
+
+ property
+ .registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE, (manager, eventInfo) -> {
+ MockCloudEventCallbackImpl.triggeredOperation
+ .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE);
+ });
+ property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE,
+ (manager, eventInfo) -> {
+ MockCloudEventCallbackImpl.triggeredOperation
+ .add(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE);
+ });
+ property.registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME,
+ (manager, eventInfo) -> {
+ MockCloudEventCallbackImpl.triggeredOperation
+ .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME);
+ });
+ property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME,
+ (manager, eventInfo) -> {
+ MockCloudEventCallbackImpl.triggeredOperation
+ .add(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME);
+ });
+
+ // Manually trigger event
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+ Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
+ Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
+ Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME));
+ Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME));
+
+ MockCloudEventCallbackImpl.triggeredOperation.clear();
+
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_RESUME, null);
+ Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE));
+ Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE));
+ Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME));
+ Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME));
+ }
+
+ @Test
+ public void testUsingInvalidImplClassName() throws Exception {
+ // Cloud event callback property
+ CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections
+ .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME,
+ "org.apache.helix.cloud.InvalidClassName"));
+ _cloudProperty.setCloudEventCallbackProperty(property);
+
+ _helixManager.connect();
+
+ // Manually trigger event
+ CloudEventHandlerFactory.getInstance()
+ .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null);
+ }
+
+ private boolean callbackTriggered(MockCloudEventCallbackImpl.OperationType type) {
+ return MockCloudEventCallbackImpl.triggeredOperation.contains(type);
+ }
+
+ public static class MockEventAwareZKHelixManager extends ZKHelixManager {
+ private final HelixManagerProperty _helixManagerProperty;
+ private CloudEventListener _cloudEventListener;
+
+ /**
+ * Use a mock zk helix manager to avoid the need to connect to zk
+ */
+ public MockEventAwareZKHelixManager(String clusterName, String instanceName,
+ InstanceType instanceType, String zkAddress, HelixManagerStateListener stateListener,
+ HelixManagerProperty helixManagerProperty) {
+ super(clusterName, instanceName, instanceType, zkAddress, stateListener,
+ helixManagerProperty);
+ _helixManagerProperty = helixManagerProperty;
+ }
+
+ @Override
+ public void connect() throws IllegalAccessException, InstantiationException {
+ if (_helixManagerProperty != null) {
+ HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
+ if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
+ _cloudEventListener =
+ new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
+ CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener);
+ }
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ if (_cloudEventListener != null) {
+ CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener);
+ _cloudEventListener = null;
+ }
+ }
+ }
+}