You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:35 UTC
[helix] 01/03: Controller-side Task Current State Migration (#1550)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit a18deb06c28e25e79a9a773069a79ce21a2399b3
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Tue Dec 8 14:31:12 2020 -0800
Controller-side Task Current State Migration (#1550)
First part of task current state migration.
All changes made in this PR are on the controller side
and are directly pipeline-impacting.
---
.../main/java/org/apache/helix/HelixConstants.java | 1 +
.../main/java/org/apache/helix/HelixManager.java | 11 ++++
.../main/java/org/apache/helix/PropertyKey.java | 45 +++++++++++++++++
.../java/org/apache/helix/PropertyPathBuilder.java | 26 ++++++++++
.../main/java/org/apache/helix/PropertyType.java | 1 +
.../listeners/TaskCurrentStateChangeListener.java} | 40 +++++++--------
.../helix/common/caches/TaskCurrentStateCache.java | 59 ++++++++++++++++++++++
.../helix/controller/GenericHelixController.java | 17 +++++++
.../dataproviders/BaseControllerDataProvider.java | 35 ++++++++++++-
.../WorkflowControllerDataProvider.java | 16 +++++-
.../helix/controller/stages/ClusterEventType.java | 1 +
.../stages/CurrentStateComputationStage.java | 6 +--
.../stages/ResourceComputationStage.java | 5 +-
.../apache/helix/manager/zk/CallbackHandler.java | 21 +++++++-
.../helix/manager/zk/ParticipantManager.java | 3 ++
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 2 +
.../helix/manager/zk/ZKHelixDataAccessor.java | 4 ++
.../apache/helix/manager/zk/ZKHelixManager.java | 8 +++
.../org/apache/helix/TestPropertyPathBuilder.java | 7 +++
.../test/java/org/apache/helix/TestZKCallback.java | 23 +++++++++
.../java/org/apache/helix/common/ZkTestBase.java | 7 +++
.../waged/model/AbstractTestClusterModel.java | 1 +
.../stages/TestCurrentStateComputationStage.java | 46 ++++++++++++++++-
.../stages/TestResourceComputationStage.java | 28 ++++++++++
.../integration/TestZkCallbackHandlerLeak.java | 35 ++++++++++---
.../manager/TestParticipantManager.java | 2 +-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 2 +
27 files changed, 409 insertions(+), 43 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 0fcdfe6..445b32c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -34,6 +34,7 @@ public interface HelixConstants {
CLUSTER_CONFIG (PropertyType.CONFIGS),
LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
CURRENT_STATE (PropertyType.CURRENTSTATES),
+ TASK_CURRENT_STATE (PropertyType.TASKCURRENTSTATES),
CUSTOMIZED_STATE_ROOT (PropertyType.CUSTOMIZEDSTATES),
CUSTOMIZED_STATE (PropertyType.CUSTOMIZEDSTATES),
MESSAGE (PropertyType.MESSAGES),
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 06a4ec5..4ce3ff9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -230,6 +230,17 @@ public interface HelixManager {
String sessionId) throws Exception;
/**
+ * Uses CurrentStateChangeListener since TaskCurrentState shares the same CurrentState model
+ * @see CurrentStateChangeListener#onStateChange(String, List, NotificationContext)
+ * @param listener
+ * @param instanceName
+ */
+ default void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName, String sessionId) throws Exception {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
* @see CustomizedStateRootChangeListener#onCustomizedStateRootChange(String, NotificationContext)
* @param listener
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 6437f1a..0427068 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -74,6 +74,7 @@ import static org.apache.helix.PropertyType.STATEMODELDEFS;
import static org.apache.helix.PropertyType.STATUSUPDATES;
import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
import static org.apache.helix.PropertyType.TARGETEXTERNALVIEW;
+import static org.apache.helix.PropertyType.TASKCURRENTSTATES;
/**
@@ -486,6 +487,50 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with {@link CurrentState} of an instance and session. This key
+ * is for TaskCurrentState specifically.
+ * @param instanceName
+ * @param sessionId
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey taskCurrentStates(String instanceName, String sessionId) {
+ return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+ sessionId);
+ }
+
+ /**
+ * Get a property key associated with {@link CurrentState} of an instance, session, and
+ * job. This key is for TaskCurrentState specifically.
+ * @param instanceName
+ * @param sessionId
+ * @param jobName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName) {
+ return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+ sessionId, jobName);
+ }
+
+ /**
+ * Get a property key associated with {@link CurrentState} of an instance, session, job,
+ * and bucket name. This key is for TaskCurrentState specifically.
+ * @param instanceName
+ * @param sessionId
+ * @param jobName
+ * @param bucketName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName,
+ String bucketName) {
+ if (bucketName == null) {
+ return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+ sessionId, jobName);
+ }
+ return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+ sessionId, jobName, bucketName);
+ }
+
+ /**
* Get a property key associated with the root of {@link CustomizedState} of an instance
* @param instanceName
* @return {@link PropertyKey}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index d4cec13..49e765d 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -54,6 +54,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
import static org.apache.helix.PropertyType.PAUSE;
import static org.apache.helix.PropertyType.STATEMODELDEFS;
import static org.apache.helix.PropertyType.STATUSUPDATES;
+import static org.apache.helix.PropertyType.TASKCURRENTSTATES;
import static org.apache.helix.PropertyType.WORKFLOWCONTEXT;
@@ -125,6 +126,14 @@ public class PropertyPathBuilder {
"/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}");
addEntry(PropertyType.CURRENTSTATES, 5,
"/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}/{bucketName}");
+ addEntry(TASKCURRENTSTATES, 2,
+ "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES");
+ addEntry(TASKCURRENTSTATES, 3,
+ "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}");
+ addEntry(TASKCURRENTSTATES, 4,
+ "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}");
+ addEntry(TASKCURRENTSTATES, 5,
+ "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}/{bucketName}");
addEntry(PropertyType.CUSTOMIZEDSTATES, 2,
"/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES");
addEntry(PropertyType.CUSTOMIZEDSTATES, 3,
@@ -346,6 +355,23 @@ public class PropertyPathBuilder {
sessionId, resourceName);
}
+ public static String instanceTaskCurrentState(String clusterName, String instanceName) {
+ return String.format("/%s/INSTANCES/%s/TASKCURRENTSTATES", clusterName, instanceName);
+ }
+
+ public static String instanceTaskCurrentState(String clusterName, String instanceName,
+ String sessionId) {
+ return String
+ .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s", clusterName, instanceName, sessionId);
+ }
+
+ public static String instanceTaskCurrentState(String clusterName, String instanceName,
+ String sessionId, String resourceName) {
+ return String
+ .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s/%s", clusterName, instanceName, sessionId,
+ resourceName);
+ }
+
public static String instanceCustomizedState(String clusterName, String instanceName) {
return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES", clusterName, instanceName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 9ce27f3..bedf79e 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -52,6 +52,7 @@ public enum PropertyType {
// INSTANCE PROPERTIES
MESSAGES(Type.INSTANCE, true, true, true),
CURRENTSTATES(Type.INSTANCE, true, true, false, false, true),
+ TASKCURRENTSTATES(Type.INSTANCE, true, true, false, false, true),
STATUSUPDATES(Type.INSTANCE, true, true, false, false, false, true),
ERRORS(Type.INSTANCE, true, true),
INSTANCE_HISTORY(Type.INSTANCE, true, true, true),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
copy to helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
index 4c8a60e..5f00eae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
@@ -1,4 +1,4 @@
-package org.apache.helix.controller.stages;
+package org.apache.helix.api.listeners;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,24 +19,22 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-public enum ClusterEventType {
- IdealStateChange,
- CurrentStateChange,
- CustomizedStateChange,
- ConfigChange,
- ClusterConfigChange,
- ResourceConfigChange,
- InstanceConfigChange,
- CustomizeStateConfigChange,
- LiveInstanceChange,
- MessageChange,
- ExternalViewChange,
- CustomizedViewChange,
- TargetExternalViewChange,
- Resume,
- PeriodicalRebalance,
- OnDemandRebalance,
- RetryRebalance,
- StateVerifier,
- Unknown
+import java.util.List;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.CurrentState;
+
+/**
+ * Interface to implement to respond to changes in task current states
+ */
+public interface TaskCurrentStateChangeListener {
+
+ /**
+ * Invoked when task current states change
+ * @param instanceName name of the instance whose states changed
+ * @param statesInfo a list of the task current states
+ * @param changeContext the change event and state
+ */
+ void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext);
}
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java
new file mode 100644
index 0000000..936f6d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java
@@ -0,0 +1,59 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.controllers.ControlContextProvider;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+
+
+/**
+ * Cache to hold all task CurrentStates of a cluster.
+ */
+public class TaskCurrentStateCache extends ParticipantStateCache<CurrentState> {
+ public TaskCurrentStateCache(ControlContextProvider controlContextProvider) {
+ super(controlContextProvider);
+ }
+
+ @Override
+ protected Set<PropertyKey> PopulateParticipantKeys(HelixDataAccessor accessor,
+ Map<String, LiveInstance> liveInstanceMap) {
+ Set<PropertyKey> participantStateKeys = new HashSet<>();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ for (String instanceName : liveInstanceMap.keySet()) {
+ LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+ String sessionId = liveInstance.getEphemeralOwner();
+ List<String> currentStateNames =
+ accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
+ for (String currentStateName : currentStateNames) {
+ participantStateKeys
+ .add(keyBuilder.taskCurrentState(instanceName, sessionId, currentStateName));
+ }
+ }
+ return participantStateKeys;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3673a07..11ca5a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -58,6 +58,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
import org.apache.helix.common.ClusterEventBlockingQueue;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
@@ -128,6 +129,7 @@ import static org.apache.helix.HelixConstants.ChangeType;
*/
public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener,
MessageListener, CurrentStateChangeListener,
+ TaskCurrentStateChangeListener,
CustomizedStateRootChangeListener,
CustomizedStateChangeListener,
CustomizedStateConfigChangeListener, ControllerChangeListener,
@@ -569,6 +571,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
rebalancePipeline);
registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
rebalancePipeline);
+ registry.register(ClusterEventType.TaskCurrentStateChange, dataRefresh, dataPreprocess,
+ rebalancePipeline);
registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
@@ -905,6 +909,17 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
@Override
@PreFetch(enabled = false)
+ public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext) {
+ logger.info("START: GenericClusterController.onTaskCurrentStateChange()");
+ notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE);
+ pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections
+ .<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
+ logger.info("END: GenericClusterController.onTaskCurrentStateChange()");
+ }
+
+ @Override
+ @PreFetch(enabled = false)
public void onCustomizedStateRootChange(String instanceName, List<String> customizedStateTypes,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onCustomizedStateRootChange()");
@@ -1243,6 +1258,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
// remove current-state listener for expired session
String instanceName = lastSessions.get(session).getInstanceName();
manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+ manager.removeListener(keyBuilder.taskCurrentStates(instanceName, session), this);
}
}
}
@@ -1264,6 +1280,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
try {
// add current-state listeners for new sessions
manager.addCurrentStateChangeListener(this, instanceName, session);
+ manager.addTaskCurrentStateChangeListener(this, instanceName, session);
logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+ instanceName + ", session: " + session + ", listener: " + this);
} catch (Exception e) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index c2c9078..070c70a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.common.caches.AbstractDataCache;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.common.caches.InstanceMessagesCache;
import org.apache.helix.common.caches.PropertyCache;
+import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
@@ -58,6 +59,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +104,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
// Special caches
private CurrentStateCache _currentStateCache;
+ protected TaskCurrentStateCache _taskCurrentStateCache;
private InstanceMessagesCache _instanceMessagesCache;
// Other miscellaneous caches
@@ -226,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
}
}, false);
_currentStateCache = new CurrentStateCache(this);
+ _taskCurrentStateCache = new TaskCurrentStateCache(this);
_instanceMessagesCache = new InstanceMessagesCache(_clusterName);
}
@@ -572,13 +576,40 @@ public class BaseControllerDataProvider implements ControlContextProvider {
/**
* Provides the current state of the node for a given session id, the sessionid can be got from
- * LiveInstance
+ * LiveInstance. This function is only called from the regular pipelines.
* @param instanceName
* @param clientSessionId
* @return
*/
public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
- return _currentStateCache.getParticipantState(instanceName, clientSessionId);
+ return getCurrentState(instanceName, clientSessionId, false);
+ }
+
+ /**
+ * Provides the current state of the node for a given session id, the sessionid can be got from
+ * LiveInstance
+ * @param instanceName
+ * @param clientSessionId
+ * @return
+ */
+ public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId,
+ boolean isTaskPipeline) {
+ Map<String, CurrentState> regularCurrentStates =
+ _currentStateCache.getParticipantState(instanceName, clientSessionId);
+ if (isTaskPipeline) {
+ // TODO: Targeted jobs still rely on regular resource current states, so need to include all
+ // resource current states without filtering. For now, allow regular current states to
+ // overwrite task current states in case of name conflicts, which are unlikely. Eventually,
+ // it should be completely split.
+ Map<String, CurrentState> mergedCurrentStates = new HashMap<>();
+ mergedCurrentStates
+ .putAll(_taskCurrentStateCache.getParticipantState(instanceName, clientSessionId));
+ mergedCurrentStates.putAll(regularCurrentStates);
+ return Collections.unmodifiableMap(mergedCurrentStates);
+ }
+ return regularCurrentStates.entrySet().stream().filter(
+ entry -> !TaskConstants.STATE_MODEL_NAME.equals(entry.getValue().getStateModelDefRef()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 45e1319..96894e8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -26,6 +26,8 @@ import java.util.Set;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.common.caches.TaskCurrentStateCache;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.AbstractDataCache;
import org.apache.helix.common.caches.TaskDataCache;
@@ -78,14 +80,15 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
// This check (and set) is necessary for now since the current state flag in
// _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+ || _propertyDataChangedMap.get(HelixConstants.ChangeType.TASK_CURRENT_STATE).getAndSet(false)
|| _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
- || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
|| propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
}
public synchronized void refresh(HelixDataAccessor accessor) {
long startTime = System.currentTimeMillis();
Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+ _taskCurrentStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap());
refreshClusterStateChangeFlags(propertyRefreshed);
@@ -252,6 +255,17 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
return _existsLiveInstanceOrCurrentStateOrMessageChange;
}
+ /**
+ * For a certain session, return the task current states on the node.
+ * @param instanceName
+ * @param clientSessionId
+ * @return A mapping of resource names to CurrentStates
+ */
+ public Map<String, CurrentState> getTaskCurrentState(String instanceName,
+ String clientSessionId) {
+ return _taskCurrentStateCache.getParticipantState(instanceName, clientSessionId);
+ }
+
@Override
public String toString() {
StringBuilder sb = genCacheContentStringBuilder();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 4c8a60e..cd0ce60 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
public enum ClusterEventType {
IdealStateChange,
CurrentStateChange,
+ TaskCurrentStateChange,
CustomizedStateChange,
ConfigChange,
ClusterConfigChange,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4af7e27..49e5d8f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -89,9 +89,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
String instanceSessionId = instance.getEphemeralOwner();
// update current states.
- Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
- instanceSessionId);
- updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
+ updateCurrentStates(instance,
+ cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline).values(),
+ currentStateOutput, resourceMap);
Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
// update pending messages
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 7c2cac6..1f77fa6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -190,10 +190,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
String clientSessionId = instance.getEphemeralOwner();
Map<String, CurrentState> currentStateMap =
- cache.getCurrentState(instanceName, clientSessionId);
- if (currentStateMap == null || currentStateMap.size() == 0) {
- continue;
- }
+ cache.getCurrentState(instanceName, clientSessionId, isTaskCache);
for (CurrentState currentState : currentStateMap.values()) {
String resourceName = currentState.getResourceName();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 03c5f73..24b42af 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -61,6 +61,7 @@ import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
@@ -102,6 +103,7 @@ import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
import static org.apache.helix.HelixConstants.ChangeType.RESOURCE_CONFIG;
import static org.apache.helix.HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.TASK_CURRENT_STATE;
@PreFetchChangedData(enabled = false)
public class CallbackHandler implements IZkChildListener, IZkDataListener {
@@ -261,6 +263,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
case CURRENT_STATE:
listenerClass = CurrentStateChangeListener.class;
break;
+ case TASK_CURRENT_STATE:
+ listenerClass = TaskCurrentStateChangeListener.class;
+ break;
case CUSTOMIZED_STATE_ROOT:
listenerClass = CustomizedStateRootChangeListener.class;
break;
@@ -415,6 +420,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
List<CurrentState> currentStates = preFetch(_propertyKey);
currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
+ } else if (_changeType == TASK_CURRENT_STATE) {
+ TaskCurrentStateChangeListener taskCurrentStateChangeListener =
+ (TaskCurrentStateChangeListener) _listener;
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+ List<CurrentState> currentStates = preFetch(_propertyKey);
+ taskCurrentStateChangeListener
+ .onTaskCurrentStateChange(instanceName, currentStates, changeContext);
+
} else if (_changeType == CUSTOMIZED_STATE_ROOT) {
CustomizedStateRootChangeListener customizedStateRootChangeListener =
(CustomizedStateRootChangeListener) _listener;
@@ -527,8 +540,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
_zkClient.unsubscribeChildChanges(path, this);
}
- // List of children could be empty, but won't be null.
- return _zkClient.getChildren(path);
+ try {
+ return _zkClient.getChildren(path);
+ } catch (ZkNoNodeException e) {
+ return null;
+ }
}
private void subscribeDataChange(String path, NotificationContext.Type callbackType) {
@@ -571,6 +587,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
try {
switch (_changeType) {
case CURRENT_STATE:
+ case TASK_CURRENT_STATE:
case CUSTOMIZED_STATE:
case IDEAL_STATE:
case EXTERNAL_VIEW:
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 937fb4b..3a58570 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -418,6 +418,9 @@ public class ParticipantManager {
String path = _keyBuilder.currentStates(_instanceName, session).getPath();
LOG.info("Removing current states from previous sessions. path: " + path);
_zkclient.deleteRecursively(path);
+ path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath();
+ LOG.info("Removing task current states from previous sessions. path: " + path);
+ _zkclient.deleteRecursively(path);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 7b946b1..df5abe4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -193,6 +193,8 @@ public class ZKHelixAdmin implements HelixAdmin {
_zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true);
+ _zkClient
+ .createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 35cc663..203c6c7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -186,6 +186,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
boolean success = false;
switch (type) {
case CURRENTSTATES:
+ case TASKCURRENTSTATES:
case CUSTOMIZEDSTATES:
success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
break;
@@ -243,6 +244,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
switch (type) {
case CURRENTSTATES:
+ case TASKCURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
@@ -301,6 +303,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
switch (type) {
case CURRENTSTATES:
+ case TASKCURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
@@ -421,6 +424,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
for (ZNRecord record : children) {
switch (type) {
case CURRENTSTATES:
+ case TASKCURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
if (record != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 05e1e26..04f4385 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -591,6 +591,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
@Override
+ public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName, String sessionId) throws Exception {
+ addListener(listener, new Builder(_clusterName).taskCurrentStates(instanceName, sessionId),
+ ChangeType.TASK_CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged
+ });
+ }
+
+ @Override
public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
String instanceName) throws Exception {
addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName),
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
index 1f8df6c..422fb9c 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
@@ -39,6 +39,13 @@ public class TestPropertyPathBuilder {
actual = PropertyPathBuilder.instanceCurrentState("test_cluster", "instanceName1", "sessionId");
AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId");
+ actual = PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1");
+ AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES");
+ actual =
+ PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1", "sessionId");
+ AssertJUnit
+ .assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES/sessionId");
+
actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1");
AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CUSTOMIZEDSTATES");
actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1", "customizedState1");
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index d97471c..5dc0171 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -32,6 +32,7 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.ExternalView;
@@ -58,6 +59,7 @@ public class TestZKCallback extends ZkUnitTestBase {
public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
ConfigChangeListener, CurrentStateChangeListener,
+ TaskCurrentStateChangeListener,
CustomizedStateConfigChangeListener,
CustomizedStateRootChangeListener,
ExternalViewChangeListener,
@@ -66,6 +68,7 @@ public class TestZKCallback extends ZkUnitTestBase {
boolean liveInstanceChangeReceived = false;
boolean configChangeReceived = false;
boolean currentStateChangeReceived = false;
+ boolean taskCurrentStateChangeReceived = false;
boolean customizedStateConfigChangeReceived = false;
boolean customizedStateRootChangeReceived = false;
boolean messageChangeReceived = false;
@@ -84,6 +87,12 @@ public class TestZKCallback extends ZkUnitTestBase {
}
@Override
+ public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext) {
+ taskCurrentStateChangeReceived = true;
+ }
+
+ @Override
public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
configChangeReceived = true;
}
@@ -149,6 +158,8 @@ public class TestZKCallback extends ZkUnitTestBase {
testHelixManager.addMessageListener(testListener, "localhost_8900");
testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
testHelixManager.getSessionId());
+ testHelixManager.addTaskCurrentStateChangeListener(testListener, "localhost_8900",
+ testHelixManager.getSessionId());
testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900");
testHelixManager.addConfigChangeListener(testListener);
testHelixManager.addIdealStateChangeListener(testListener);
@@ -184,6 +195,18 @@ public class TestZKCallback extends ZkUnitTestBase {
Assert.assertTrue(result);
testListener.Reset();
+ CurrentState taskCurState = new CurrentState("db-12345");
+ taskCurState.setSessionId("sessionId");
+ taskCurState.setStateModelDefRef("StateModelDef");
+ accessor.setProperty(keyBuilder
+ .taskCurrentState("localhost_8900", testHelixManager.getSessionId(),
+ taskCurState.getId()), taskCurState);
+ result = TestHelper.verify(() -> {
+ return testListener.taskCurrentStateChangeReceived;
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result);
+ testListener.Reset();
+
IdealState idealState = new IdealState("db-1234");
idealState.setNumPartitions(400);
idealState.setReplicas(Integer.toString(2));
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 0b1c375..e832307 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -929,6 +929,13 @@ public class ZkTestBase {
LOG.error("Current state not empty for " + participant);
return false;
}
+ CurrentState taskCurrentState =
+ accessor.getProperty(keyBuilder.taskCurrentState(participant, sessionId, _resourceName));
+ Map<String, String> taskPartitionStateMap = taskCurrentState.getPartitionStateMap();
+ if (taskPartitionStateMap != null && !taskPartitionStateMap.isEmpty()) {
+ LOG.error("Task current state not empty for " + participant);
+ return false;
+ }
}
}
return true;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 8887e87..e3b346d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -154,6 +154,7 @@ public abstract class AbstractTestClusterModel {
currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+ when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
// 5. Set up the resource config for the two resources with the partition weight.
Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 9658e2a..91e275a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -19,9 +19,11 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashMap;
import java.util.Map;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.CurrentState;
@@ -99,11 +101,28 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
stateWithDeadSession.setStateModelDefRef("MasterSlave");
stateWithDeadSession.setState("testResourceName_1", "MASTER");
+ ZNRecord record3 = new ZNRecord("testTaskResourceName");
+ CurrentState taskStateWithLiveSession = new CurrentState(record3);
+ taskStateWithLiveSession.setSessionId("session_3");
+ taskStateWithLiveSession.setStateModelDefRef("Task");
+ taskStateWithLiveSession.setState("testTaskResourceName_1", "INIT");
+ ZNRecord record4 = new ZNRecord("testTaskResourceName");
+ CurrentState taskStateWithDeadSession = new CurrentState(record4);
+ taskStateWithDeadSession.setSessionId("session_dead");
+ taskStateWithDeadSession.setStateModelDefRef("Task");
+ taskStateWithDeadSession.setState("testTaskResourceName_1", "INIT");
+
accessor.setProperty(keyBuilder.currentState("localhost_3", "session_3", "testResourceName"),
stateWithLiveSession);
- accessor.setProperty(
- keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
+ accessor.setProperty(keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
stateWithDeadSession);
+ accessor.setProperty(
+ keyBuilder.taskCurrentState("localhost_3", "session_3", "testTaskResourceName"),
+ taskStateWithLiveSession);
+ accessor.setProperty(
+ keyBuilder.taskCurrentState("localhost_3", "session_dead", "testTaskResourceName"),
+ taskStateWithDeadSession);
+
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -111,6 +130,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
"localhost_3");
AssertJUnit.assertEquals(currentState, "OFFLINE");
+ // Non Task Framework event will cause task current states to be ignored
+ String taskCurrentState = output3
+ .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
+ "localhost_3");
+ AssertJUnit.assertNull(taskCurrentState);
// Add another state transition message which is stale
message = new Message(Message.MessageType.STATE_TRANSITION, "msg2");
@@ -128,6 +152,24 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
AssertJUnit.assertEquals(dataCache.getStaleMessages().size(), 1);
AssertJUnit.assertTrue(dataCache.getStaleMessages().containsKey("localhost_3"));
AssertJUnit.assertTrue(dataCache.getStaleMessages().get("localhost_3").containsKey("msg2"));
+
+ // Use a task event to check that task current states are included
+ resourceMap = new HashMap<String, Resource>();
+ Resource testTaskResource = new Resource("testTaskResourceName");
+ testTaskResource.setStateModelDefRef("Task");
+ testTaskResource.addPartition("testTaskResourceName_1");
+ resourceMap.put("testTaskResourceName", testTaskResource);
+ ClusterEvent taskEvent = new ClusterEvent(ClusterEventType.Unknown);
+ taskEvent.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ taskEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
+ new WorkflowControllerDataProvider());
+ runStage(taskEvent, new ReadClusterDataStage());
+ runStage(taskEvent, stage);
+ CurrentStateOutput output5 = taskEvent.getAttribute(AttributeName.CURRENT_STATE.name());
+ taskCurrentState = output5
+ .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
+ "localhost_3");
+ AssertJUnit.assertEquals(taskCurrentState, "INIT");
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 440543e..6b6f28e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.StageContext;
@@ -154,6 +155,14 @@ public class TestResourceComputationStage extends BaseStageTest {
accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
currentState);
+ String oldTaskResource = "testTaskResourceOld";
+ CurrentState taskCurrentState = new CurrentState(oldTaskResource);
+ taskCurrentState.setState("testTaskResourceOld_0", "RUNNING");
+ taskCurrentState.setState("testTaskResourceOld_1", "FINISHED");
+ taskCurrentState.setStateModelDefRef("Task");
+ accessor.setProperty(keyBuilder.taskCurrentState(instanceName, sessionId, oldTaskResource),
+ taskCurrentState);
+
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
ResourceComputationStage stage = new ResourceComputationStage();
@@ -185,6 +194,25 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
+ new WorkflowControllerDataProvider());
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+
+ resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+ // +2 because it will have current state and task current state
+ AssertJUnit.assertEquals(resources.length + 2, resourceMap.size());
+
+ Resource taskResource = resourceMap.get(oldTaskResource);
+ AssertJUnit.assertNotNull(taskResource);
+ AssertJUnit.assertEquals(taskResource.getResourceName(), oldTaskResource);
+ AssertJUnit
+ .assertEquals(taskResource.getStateModelDefRef(), taskCurrentState.getStateModelDefRef());
+ AssertJUnit.assertEquals(taskResource.getPartitions().size(),
+ taskCurrentState.getPartitionStateMap().size());
+ AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_0"));
+ AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_1"));
}
@Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 2f27d4b..7bc92dc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.AccessOption;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
@@ -43,6 +44,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.slf4j.Logger;
@@ -62,6 +64,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
String clusterName = className + "_" + methodName;
final int n = 2;
final int r = 2;
+ final int taskResourceCount = 2;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -78,6 +81,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// start participants
MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
@@ -85,6 +89,13 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
+
+ // Manually set up task current states
+ for (int j = 0; j < taskResourceCount; j++) {
+ _baseAccessor.create(keyBuilder
+ .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j)
+ .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT);
+ }
}
ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
@@ -105,7 +116,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// System.out.println("controller watch paths: " + watchPaths);
// where n is number of nodes and r is number of resources
- return watchPaths.size() == (8 + r + ( 5 + r) * n);
+ return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
}
}, 2000);
Assert.assertTrue(result, "Controller has incorrect number of zk-watchers.");
@@ -130,8 +141,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// printHandlers(participantManagerToExpire);
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, 14,
- "HelixController should have 14 (8+3n) callback handlers for 2 (n) participant");
+ Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
+ "HelixController should have 16 (8+4n) callback handlers for 2 (n) participant");
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
@@ -160,7 +171,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// System.out.println("controller watch paths after session expiry: " + watchPaths);
// where n is number of nodes and r is number of resources
- return watchPaths.size() == (8 + r + ( 5 + r) * n);
+ // one participant is disconnected, and its task current states are removed
+ return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n - 1) + 6 + r);
}
}, 2000);
Assert.assertTrue(result, "Controller has incorrect number of zk-watchers after session expiry.");
@@ -208,6 +220,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
String clusterName = className + "_" + methodName;
final int n = 2;
final int r = 1;
+ final int taskResourceCount = 1;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -224,6 +237,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// start participants
MockParticipantManager[] participants = new MockParticipantManager[n];
for (int i = 0; i < n; i++) {
@@ -231,6 +245,12 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
+ // Manually set up task current states
+ for (int j = 0; j < taskResourceCount; j++) {
+ _baseAccessor.create(keyBuilder
+ .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j)
+ .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT);
+ }
}
ZkHelixClusterVerifier verifier =
@@ -256,8 +276,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManager.getHandlers().size();
- Assert.assertEquals(controllerHandlerNb, 8 + 3 * n,
- "HelixController should have 14 (8+3n) callback handlers for 2 participant, but was "
+ Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
+ "HelixController should have 16 (8+4n) callback handlers for 2 participant, but was "
+ controllerHandlerNb + ", " + printHandlers(controller));
Assert.assertEquals(particHandlerNb, 1,
"HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
@@ -285,7 +305,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
System.err.println("controller watch paths after session expiry: " + watchPaths.size());
// where r is number of resources and n is number of nodes
- int expected = (8 + r + (5 + r) * n);
+ // task resource count does not attribute to ideal state watch paths
+ int expected = (8 + r + (6 + r + taskResourceCount) * n);
return watchPaths.size() == expected;
}
}, 2000);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 25523d0..050bc76 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -210,7 +210,7 @@ public class TestParticipantManager extends ZkTestBase {
// check HelixCallback Monitor
Set<ObjectInstance> objs =
_server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null);
- Assert.assertEquals(objs.size(), 18);
+ Assert.assertEquals(objs.size(), 19);
// check HelixZkClient Monitors
objs =
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index e954c93..42633c1 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -222,6 +222,8 @@ public class MockHelixAdmin implements HelixAdmin {
_baseDataAccessor
.set(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), new ZNRecord(nodeId),
0);
+ _baseDataAccessor.set(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId),
+ new ZNRecord(nodeId), 0);
_baseDataAccessor
.set(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), new ZNRecord(nodeId),
0);