You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:35 UTC

[helix] 01/03: Controller-side Task Current State Migration (#1550)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a18deb06c28e25e79a9a773069a79ce21a2399b3
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Tue Dec 8 14:31:12 2020 -0800

    Controller-side Task Current State Migration (#1550)
    
    First part of task current state migration.
    All changes made in this PR are on the controller side
    and are directly pipeline-impacting.
---
 .../main/java/org/apache/helix/HelixConstants.java |  1 +
 .../main/java/org/apache/helix/HelixManager.java   | 11 ++++
 .../main/java/org/apache/helix/PropertyKey.java    | 45 +++++++++++++++++
 .../java/org/apache/helix/PropertyPathBuilder.java | 26 ++++++++++
 .../main/java/org/apache/helix/PropertyType.java   |  1 +
 .../listeners/TaskCurrentStateChangeListener.java} | 40 +++++++--------
 .../helix/common/caches/TaskCurrentStateCache.java | 59 ++++++++++++++++++++++
 .../helix/controller/GenericHelixController.java   | 17 +++++++
 .../dataproviders/BaseControllerDataProvider.java  | 35 ++++++++++++-
 .../WorkflowControllerDataProvider.java            | 16 +++++-
 .../helix/controller/stages/ClusterEventType.java  |  1 +
 .../stages/CurrentStateComputationStage.java       |  6 +--
 .../stages/ResourceComputationStage.java           |  5 +-
 .../apache/helix/manager/zk/CallbackHandler.java   | 21 +++++++-
 .../helix/manager/zk/ParticipantManager.java       |  3 ++
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  2 +
 .../helix/manager/zk/ZKHelixDataAccessor.java      |  4 ++
 .../apache/helix/manager/zk/ZKHelixManager.java    |  8 +++
 .../org/apache/helix/TestPropertyPathBuilder.java  |  7 +++
 .../test/java/org/apache/helix/TestZKCallback.java | 23 +++++++++
 .../java/org/apache/helix/common/ZkTestBase.java   |  7 +++
 .../waged/model/AbstractTestClusterModel.java      |  1 +
 .../stages/TestCurrentStateComputationStage.java   | 46 ++++++++++++++++-
 .../stages/TestResourceComputationStage.java       | 28 ++++++++++
 .../integration/TestZkCallbackHandlerLeak.java     | 35 ++++++++++---
 .../manager/TestParticipantManager.java            |  2 +-
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  2 +
 27 files changed, 409 insertions(+), 43 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 0fcdfe6..445b32c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -34,6 +34,7 @@ public interface HelixConstants {
     CLUSTER_CONFIG (PropertyType.CONFIGS),
     LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
     CURRENT_STATE (PropertyType.CURRENTSTATES),
+    TASK_CURRENT_STATE (PropertyType.TASKCURRENTSTATES),
     CUSTOMIZED_STATE_ROOT (PropertyType.CUSTOMIZEDSTATES),
     CUSTOMIZED_STATE (PropertyType.CUSTOMIZEDSTATES),
     MESSAGE (PropertyType.MESSAGES),
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 06a4ec5..4ce3ff9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -230,6 +230,17 @@ public interface HelixManager {
       String sessionId) throws Exception;
 
   /**
+   * Uses CurrentStateChangeListener since TaskCurrentState shares the same CurrentState model
+   * @see CurrentStateChangeListener#onStateChange(String, List, NotificationContext)
+   * @param listener
+   * @param instanceName
+   */
+  default void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId) throws Exception {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
 
    * @see CustomizedStateRootChangeListener#onCustomizedStateRootChange(String, NotificationContext)
    * @param listener
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 6437f1a..0427068 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -74,6 +74,7 @@ import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
 import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
 import static org.apache.helix.PropertyType.TARGETEXTERNALVIEW;
+import static org.apache.helix.PropertyType.TASKCURRENTSTATES;
 
 
 /**
@@ -486,6 +487,50 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link CurrentState} of an instance and session. This key
+     * is for TaskCurrentState specifically.
+     * @param instanceName
+     * @param sessionId
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentStates(String instanceName, String sessionId) {
+      return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+          sessionId);
+    }
+
+    /**
+     * Get a property key associated with {@link CurrentState} of an instance, session, and
+     * job. This key is for TaskCurrentState specifically.
+     * @param instanceName
+     * @param sessionId
+     * @param jobName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName) {
+      return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+          sessionId, jobName);
+    }
+
+    /**
+     * Get a property key associated with {@link CurrentState} of an instance, session, job,
+     * and bucket name. This key is for TaskCurrentState specifically.
+     * @param instanceName
+     * @param sessionId
+     * @param jobName
+     * @param bucketName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentState(String instanceName, String sessionId, String jobName,
+        String bucketName) {
+      if (bucketName == null) {
+        return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+            sessionId, jobName);
+      }
+      return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+          sessionId, jobName, bucketName);
+    }
+
+    /**
      * Get a property key associated with the root of {@link CustomizedState} of an instance
      * @param instanceName
      * @return {@link PropertyKey}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index d4cec13..49e765d 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -54,6 +54,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
 import static org.apache.helix.PropertyType.PAUSE;
 import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
+import static org.apache.helix.PropertyType.TASKCURRENTSTATES;
 import static org.apache.helix.PropertyType.WORKFLOWCONTEXT;
 
 
@@ -125,6 +126,14 @@ public class PropertyPathBuilder {
         "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}");
     addEntry(PropertyType.CURRENTSTATES, 5,
         "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}/{bucketName}");
+    addEntry(TASKCURRENTSTATES, 2,
+        "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES");
+    addEntry(TASKCURRENTSTATES, 3,
+        "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}");
+    addEntry(TASKCURRENTSTATES, 4,
+        "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}");
+    addEntry(TASKCURRENTSTATES, 5,
+        "/{clusterName}/INSTANCES/{instanceName}/TASKCURRENTSTATES/{sessionId}/{resourceName}/{bucketName}");
     addEntry(PropertyType.CUSTOMIZEDSTATES, 2,
         "/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES");
     addEntry(PropertyType.CUSTOMIZEDSTATES, 3,
@@ -346,6 +355,23 @@ public class PropertyPathBuilder {
         sessionId, resourceName);
   }
 
+  public static String instanceTaskCurrentState(String clusterName, String instanceName) {
+    return String.format("/%s/INSTANCES/%s/TASKCURRENTSTATES", clusterName, instanceName);
+  }
+
+  public static String instanceTaskCurrentState(String clusterName, String instanceName,
+      String sessionId) {
+    return String
+        .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s", clusterName, instanceName, sessionId);
+  }
+
+  public static String instanceTaskCurrentState(String clusterName, String instanceName,
+      String sessionId, String resourceName) {
+    return String
+        .format("/%s/INSTANCES/%s/TASKCURRENTSTATES/%s/%s", clusterName, instanceName, sessionId,
+            resourceName);
+  }
+
   public static String instanceCustomizedState(String clusterName, String instanceName) {
     return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES", clusterName, instanceName);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 9ce27f3..bedf79e 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -52,6 +52,7 @@ public enum PropertyType {
   // INSTANCE PROPERTIES
   MESSAGES(Type.INSTANCE, true, true, true),
   CURRENTSTATES(Type.INSTANCE, true, true, false, false, true),
+  TASKCURRENTSTATES(Type.INSTANCE, true, true, false, false, true),
   STATUSUPDATES(Type.INSTANCE, true, true, false, false, false, true),
   ERRORS(Type.INSTANCE, true, true),
   INSTANCE_HISTORY(Type.INSTANCE, true, true, true),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
copy to helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
index 4c8a60e..5f00eae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/api/listeners/TaskCurrentStateChangeListener.java
@@ -1,4 +1,4 @@
-package org.apache.helix.controller.stages;
+package org.apache.helix.api.listeners;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,24 +19,22 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-public enum ClusterEventType {
-  IdealStateChange,
-  CurrentStateChange,
-  CustomizedStateChange,
-  ConfigChange,
-  ClusterConfigChange,
-  ResourceConfigChange,
-  InstanceConfigChange,
-  CustomizeStateConfigChange,
-  LiveInstanceChange,
-  MessageChange,
-  ExternalViewChange,
-  CustomizedViewChange,
-  TargetExternalViewChange,
-  Resume,
-  PeriodicalRebalance,
-  OnDemandRebalance,
-  RetryRebalance,
-  StateVerifier,
-  Unknown
+import java.util.List;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.CurrentState;
+
+/**
+ * Interface to implement to respond to changes in task current states
+ */
+public interface TaskCurrentStateChangeListener {
+
+  /**
+   * Invoked when task current states change
+   * @param instanceName name of the instance whose states changed
+   * @param statesInfo a list of the task current states
+   * @param changeContext the change event and state
+   */
+  void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+      NotificationContext changeContext);
 }
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java
new file mode 100644
index 0000000..936f6d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskCurrentStateCache.java
@@ -0,0 +1,59 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.controllers.ControlContextProvider;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+
+
+/**
+ * Cache to hold all task CurrentStates of a cluster.
+ */
+public class TaskCurrentStateCache extends ParticipantStateCache<CurrentState> {
+  public TaskCurrentStateCache(ControlContextProvider controlContextProvider) {
+    super(controlContextProvider);
+  }
+
+  @Override
+  protected Set<PropertyKey> PopulateParticipantKeys(HelixDataAccessor accessor,
+      Map<String, LiveInstance> liveInstanceMap) {
+    Set<PropertyKey> participantStateKeys = new HashSet<>();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    for (String instanceName : liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+      String sessionId = liveInstance.getEphemeralOwner();
+      List<String> currentStateNames =
+          accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
+      for (String currentStateName : currentStateNames) {
+        participantStateKeys
+            .add(keyBuilder.taskCurrentState(instanceName, sessionId, currentStateName));
+      }
+    }
+    return participantStateKeys;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3673a07..11ca5a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -58,6 +58,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
@@ -128,6 +129,7 @@ import static org.apache.helix.HelixConstants.ChangeType;
  */
 public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener,
                                                MessageListener, CurrentStateChangeListener,
+                                               TaskCurrentStateChangeListener,
                                                CustomizedStateRootChangeListener,
                                                CustomizedStateChangeListener,
     CustomizedStateConfigChangeListener, ControllerChangeListener,
@@ -569,6 +571,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           rebalancePipeline);
       registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
           rebalancePipeline);
+      registry.register(ClusterEventType.TaskCurrentStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
       registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
           rebalancePipeline);
       registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
@@ -905,6 +909,17 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
 
   @Override
   @PreFetch(enabled = false)
+  public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+      NotificationContext changeContext) {
+    logger.info("START: GenericClusterController.onTaskCurrentStateChange()");
+    notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE);
+    pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections
+        .<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
+    logger.info("END: GenericClusterController.onTaskCurrentStateChange()");
+  }
+
+  @Override
+  @PreFetch(enabled = false)
   public void onCustomizedStateRootChange(String instanceName, List<String> customizedStateTypes,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onCustomizedStateRootChange()");
@@ -1243,6 +1258,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
             // remove current-state listener for expired session
             String instanceName = lastSessions.get(session).getInstanceName();
             manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+            manager.removeListener(keyBuilder.taskCurrentStates(instanceName, session), this);
           }
         }
       }
@@ -1264,6 +1280,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           try {
             // add current-state listeners for new sessions
             manager.addCurrentStateChangeListener(this, instanceName, session);
+            manager.addTaskCurrentStateChangeListener(this, instanceName, session);
             logger.info(manager.getInstanceName() + " added current-state listener for instance: "
                 + instanceName + ", session: " + session + ", listener: " + this);
           } catch (Exception e) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index c2c9078..070c70a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.PropertyCache;
+import org.apache.helix.common.caches.TaskCurrentStateCache;
 import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
@@ -58,6 +59,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +104,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   // Special caches
   private CurrentStateCache _currentStateCache;
+  protected TaskCurrentStateCache _taskCurrentStateCache;
   private InstanceMessagesCache _instanceMessagesCache;
 
   // Other miscellaneous caches
@@ -226,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
       }
     }, false);
     _currentStateCache = new CurrentStateCache(this);
+    _taskCurrentStateCache = new TaskCurrentStateCache(this);
     _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
@@ -572,13 +576,40 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   /**
    * Provides the current state of the node for a given session id, the sessionid can be got from
-   * LiveInstance
+   * LiveInstance. This function is only called from the regular pipelines.
    * @param instanceName
    * @param clientSessionId
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
-    return _currentStateCache.getParticipantState(instanceName, clientSessionId);
+    return getCurrentState(instanceName, clientSessionId, false);
+  }
+
+  /**
+   * Provides the current state of the node for a given session id, the sessionid can be got from
+   * LiveInstance
+   * @param instanceName
+   * @param clientSessionId
+   * @return
+   */
+  public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId,
+      boolean isTaskPipeline) {
+    Map<String, CurrentState> regularCurrentStates =
+        _currentStateCache.getParticipantState(instanceName, clientSessionId);
+    if (isTaskPipeline) {
+      // TODO: Targeted jobs still rely on regular resource current states, so need to include all
+      // resource current states without filtering. For now, allow regular current states to
+      // overwrite task current states in case of name conflicts, which are unlikely. Eventually,
+      // it should be completely split.
+      Map<String, CurrentState> mergedCurrentStates = new HashMap<>();
+      mergedCurrentStates
+          .putAll(_taskCurrentStateCache.getParticipantState(instanceName, clientSessionId));
+      mergedCurrentStates.putAll(regularCurrentStates);
+      return Collections.unmodifiableMap(mergedCurrentStates);
+    }
+    return regularCurrentStates.entrySet().stream().filter(
+        entry -> !TaskConstants.STATE_MODEL_NAME.equals(entry.getValue().getStateModelDefRef()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 45e1319..96894e8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -26,6 +26,8 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.common.caches.TaskCurrentStateCache;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.TaskDataCache;
@@ -78,14 +80,15 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
         // This check (and set) is necessary for now since the current state flag in
         // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.TASK_CURRENT_STATE).getAndSet(false)
             || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
-            || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
     Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+    _taskCurrentStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap());
 
     refreshClusterStateChangeFlags(propertyRefreshed);
 
@@ -252,6 +255,17 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
     return _existsLiveInstanceOrCurrentStateOrMessageChange;
   }
 
+  /**
+   * For a certain session, return the task current states on the node.
+   * @param instanceName
+   * @param clientSessionId
+   * @return A mapping of resource names to CurrentStates
+   */
+  public Map<String, CurrentState> getTaskCurrentState(String instanceName,
+      String clientSessionId) {
+    return _taskCurrentStateCache.getParticipantState(instanceName, clientSessionId);
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = genCacheContentStringBuilder();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 4c8a60e..cd0ce60 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
 public enum ClusterEventType {
   IdealStateChange,
   CurrentStateChange,
+  TaskCurrentStateChange,
   CustomizedStateChange,
   ConfigChange,
   ClusterConfigChange,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4af7e27..49e5d8f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -89,9 +89,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       String instanceSessionId = instance.getEphemeralOwner();
 
       // update current states.
-      Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
-          instanceSessionId);
-      updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
+      updateCurrentStates(instance,
+          cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline).values(),
+          currentStateOutput, resourceMap);
 
       Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
       // update pending messages
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 7c2cac6..1f77fa6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -190,10 +190,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
         String clientSessionId = instance.getEphemeralOwner();
 
         Map<String, CurrentState> currentStateMap =
-            cache.getCurrentState(instanceName, clientSessionId);
-        if (currentStateMap == null || currentStateMap.size() == 0) {
-          continue;
-        }
+            cache.getCurrentState(instanceName, clientSessionId, isTaskCache);
         for (CurrentState currentState : currentStateMap.values()) {
 
           String resourceName = currentState.getResourceName();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 03c5f73..24b42af 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -61,6 +61,7 @@ import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -102,6 +103,7 @@ import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
 import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
 import static org.apache.helix.HelixConstants.ChangeType.RESOURCE_CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.TASK_CURRENT_STATE;
 
 @PreFetchChangedData(enabled = false)
 public class CallbackHandler implements IZkChildListener, IZkDataListener {
@@ -261,6 +263,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
       case CURRENT_STATE:
         listenerClass = CurrentStateChangeListener.class;
         break;
+      case TASK_CURRENT_STATE:
+        listenerClass = TaskCurrentStateChangeListener.class;
+        break;
       case CUSTOMIZED_STATE_ROOT:
         listenerClass = CustomizedStateRootChangeListener.class;
         break;
@@ -415,6 +420,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         List<CurrentState> currentStates = preFetch(_propertyKey);
         currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
 
+      } else if (_changeType == TASK_CURRENT_STATE) {
+        TaskCurrentStateChangeListener taskCurrentStateChangeListener =
+            (TaskCurrentStateChangeListener) _listener;
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        List<CurrentState> currentStates = preFetch(_propertyKey);
+        taskCurrentStateChangeListener
+            .onTaskCurrentStateChange(instanceName, currentStates, changeContext);
+
       } else if (_changeType == CUSTOMIZED_STATE_ROOT) {
         CustomizedStateRootChangeListener customizedStateRootChangeListener =
             (CustomizedStateRootChangeListener) _listener;
@@ -527,8 +540,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
       _zkClient.unsubscribeChildChanges(path, this);
     }
 
-    // List of children could be empty, but won't be null.
-    return _zkClient.getChildren(path);
+    try {
+      return _zkClient.getChildren(path);
+    } catch (ZkNoNodeException e) {
+      return null;
+    }
   }
 
   private void subscribeDataChange(String path, NotificationContext.Type callbackType) {
@@ -571,6 +587,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         try {
           switch (_changeType) {
             case CURRENT_STATE:
+            case TASK_CURRENT_STATE:
             case CUSTOMIZED_STATE:
             case IDEAL_STATE:
             case EXTERNAL_VIEW:
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 937fb4b..3a58570 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -418,6 +418,9 @@ public class ParticipantManager {
       String path = _keyBuilder.currentStates(_instanceName, session).getPath();
       LOG.info("Removing current states from previous sessions. path: " + path);
       _zkclient.deleteRecursively(path);
+      path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath();
+      LOG.info("Removing task current states from previous sessions. path: " + path);
+      _zkclient.deleteRecursively(path);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 7b946b1..df5abe4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -193,6 +193,8 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true);
+    _zkClient
+        .createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 35cc663..203c6c7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -186,6 +186,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
     boolean success = false;
     switch (type) {
       case CURRENTSTATES:
+      case TASKCURRENTSTATES:
       case CUSTOMIZEDSTATES:
         success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
         break;
@@ -243,6 +244,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
       switch (type) {
       case CURRENTSTATES:
+      case TASKCURRENTSTATES:
       case IDEALSTATES:
       case EXTERNALVIEW:
         // check if bucketized
@@ -301,6 +303,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
     switch (type) {
     case CURRENTSTATES:
+    case TASKCURRENTSTATES:
     case IDEALSTATES:
     case EXTERNALVIEW:
       // check if bucketized
@@ -421,6 +424,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
       for (ZNRecord record : children) {
         switch (type) {
         case CURRENTSTATES:
+        case TASKCURRENTSTATES:
         case IDEALSTATES:
         case EXTERNALVIEW:
           if (record != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 05e1e26..04f4385 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -591,6 +591,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   @Override
+  public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId) throws Exception {
+    addListener(listener, new Builder(_clusterName).taskCurrentStates(instanceName, sessionId),
+        ChangeType.TASK_CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged
+        });
+  }
+
+  @Override
   public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
       String instanceName) throws Exception {
     addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName),
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
index 1f8df6c..422fb9c 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
@@ -39,6 +39,13 @@ public class TestPropertyPathBuilder {
     actual = PropertyPathBuilder.instanceCurrentState("test_cluster", "instanceName1", "sessionId");
     AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId");
 
+    actual = PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES");
+    actual =
+        PropertyPathBuilder.instanceTaskCurrentState("test_cluster", "instanceName1", "sessionId");
+    AssertJUnit
+        .assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/TASKCURRENTSTATES/sessionId");
+
     actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1");
     AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CUSTOMIZEDSTATES");
     actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1", "customizedState1");
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index d97471c..5dc0171 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -32,6 +32,7 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.ExternalView;
@@ -58,6 +59,7 @@ public class TestZKCallback extends ZkUnitTestBase {
 
   public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
                                                ConfigChangeListener, CurrentStateChangeListener,
+                                               TaskCurrentStateChangeListener,
                                                CustomizedStateConfigChangeListener,
                                                CustomizedStateRootChangeListener,
                                                ExternalViewChangeListener,
@@ -66,6 +68,7 @@ public class TestZKCallback extends ZkUnitTestBase {
     boolean liveInstanceChangeReceived = false;
     boolean configChangeReceived = false;
     boolean currentStateChangeReceived = false;
+    boolean taskCurrentStateChangeReceived = false;
     boolean customizedStateConfigChangeReceived = false;
     boolean customizedStateRootChangeReceived = false;
     boolean messageChangeReceived = false;
@@ -84,6 +87,12 @@ public class TestZKCallback extends ZkUnitTestBase {
     }
 
     @Override
+    public void onTaskCurrentStateChange(String instanceName, List<CurrentState> statesInfo,
+        NotificationContext changeContext) {
+      taskCurrentStateChangeReceived = true;
+    }
+
+    @Override
     public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
       configChangeReceived = true;
     }
@@ -149,6 +158,8 @@ public class TestZKCallback extends ZkUnitTestBase {
       testHelixManager.addMessageListener(testListener, "localhost_8900");
       testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
           testHelixManager.getSessionId());
+      testHelixManager.addTaskCurrentStateChangeListener(testListener, "localhost_8900",
+          testHelixManager.getSessionId());
       testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900");
       testHelixManager.addConfigChangeListener(testListener);
       testHelixManager.addIdealStateChangeListener(testListener);
@@ -184,6 +195,18 @@ public class TestZKCallback extends ZkUnitTestBase {
       Assert.assertTrue(result);
       testListener.Reset();
 
+      CurrentState taskCurState = new CurrentState("db-12345");
+      taskCurState.setSessionId("sessionId");
+      taskCurState.setStateModelDefRef("StateModelDef");
+      accessor.setProperty(keyBuilder
+          .taskCurrentState("localhost_8900", testHelixManager.getSessionId(),
+              taskCurState.getId()), taskCurState);
+      result = TestHelper.verify(() -> {
+        return testListener.taskCurrentStateChangeReceived;
+      }, TestHelper.WAIT_DURATION);
+      Assert.assertTrue(result);
+      testListener.Reset();
+
       IdealState idealState = new IdealState("db-1234");
       idealState.setNumPartitions(400);
       idealState.setReplicas(Integer.toString(2));
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 0b1c375..e832307 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -929,6 +929,13 @@ public class ZkTestBase {
             LOG.error("Current state not empty for " + participant);
             return false;
           }
+          CurrentState taskCurrentState =
+              accessor.getProperty(keyBuilder.taskCurrentState(participant, sessionId, _resourceName));
+          Map<String, String> taskPartitionStateMap = taskCurrentState.getPartitionStateMap();
+          if (taskPartitionStateMap != null && !taskPartitionStateMap.isEmpty()) {
+            LOG.error("Task current state not empty for " + participant);
+            return false;
+          }
         }
       }
       return true;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 8887e87..e3b346d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -154,6 +154,7 @@ public abstract class AbstractTestClusterModel {
     currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
     currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
     when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
 
     // 5. Set up the resource config for the two resources with the partition weight.
     Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 9658e2a..91e275a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -19,9 +19,11 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.CurrentState;
@@ -99,11 +101,28 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     stateWithDeadSession.setStateModelDefRef("MasterSlave");
     stateWithDeadSession.setState("testResourceName_1", "MASTER");
 
+    ZNRecord record3 = new ZNRecord("testTaskResourceName");
+    CurrentState taskStateWithLiveSession = new CurrentState(record3);
+    taskStateWithLiveSession.setSessionId("session_3");
+    taskStateWithLiveSession.setStateModelDefRef("Task");
+    taskStateWithLiveSession.setState("testTaskResourceName_1", "INIT");
+    ZNRecord record4 = new ZNRecord("testTaskResourceName");
+    CurrentState taskStateWithDeadSession = new CurrentState(record4);
+    taskStateWithDeadSession.setSessionId("session_dead");
+    taskStateWithDeadSession.setStateModelDefRef("Task");
+    taskStateWithDeadSession.setState("testTaskResourceName_1", "INIT");
+
     accessor.setProperty(keyBuilder.currentState("localhost_3", "session_3", "testResourceName"),
         stateWithLiveSession);
-    accessor.setProperty(
-        keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
+    accessor.setProperty(keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
         stateWithDeadSession);
+    accessor.setProperty(
+        keyBuilder.taskCurrentState("localhost_3", "session_3", "testTaskResourceName"),
+        taskStateWithLiveSession);
+    accessor.setProperty(
+        keyBuilder.taskCurrentState("localhost_3", "session_dead", "testTaskResourceName"),
+        taskStateWithDeadSession);
+
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
     CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -111,6 +130,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
         output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
             "localhost_3");
     AssertJUnit.assertEquals(currentState, "OFFLINE");
+    // Non Task Framework event will cause task current states to be ignored
+    String taskCurrentState = output3
+        .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
+            "localhost_3");
+    AssertJUnit.assertNull(taskCurrentState);
 
     // Add another state transition message which is stale
     message = new Message(Message.MessageType.STATE_TRANSITION, "msg2");
@@ -128,6 +152,24 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     AssertJUnit.assertEquals(dataCache.getStaleMessages().size(), 1);
     AssertJUnit.assertTrue(dataCache.getStaleMessages().containsKey("localhost_3"));
     AssertJUnit.assertTrue(dataCache.getStaleMessages().get("localhost_3").containsKey("msg2"));
+
+    // Use a task event to check that task current states are included
+    resourceMap = new HashMap<String, Resource>();
+    Resource testTaskResource = new Resource("testTaskResourceName");
+    testTaskResource.setStateModelDefRef("Task");
+    testTaskResource.addPartition("testTaskResourceName_1");
+    resourceMap.put("testTaskResourceName", testTaskResource);
+    ClusterEvent taskEvent = new ClusterEvent(ClusterEventType.Unknown);
+    taskEvent.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    taskEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new WorkflowControllerDataProvider());
+    runStage(taskEvent, new ReadClusterDataStage());
+    runStage(taskEvent, stage);
+    CurrentStateOutput output5 = taskEvent.getAttribute(AttributeName.CURRENT_STATE.name());
+    taskCurrentState = output5
+        .getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
+            "localhost_3");
+    AssertJUnit.assertEquals(taskCurrentState, "INIT");
   }
 
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 440543e..6b6f28e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.StageContext;
@@ -154,6 +155,14 @@ public class TestResourceComputationStage extends BaseStageTest {
     accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
         currentState);
 
+    String oldTaskResource = "testTaskResourceOld";
+    CurrentState taskCurrentState = new CurrentState(oldTaskResource);
+    taskCurrentState.setState("testTaskResourceOld_0", "RUNNING");
+    taskCurrentState.setState("testTaskResourceOld_1", "FINISHED");
+    taskCurrentState.setStateModelDefRef("Task");
+    accessor.setProperty(keyBuilder.taskCurrentState(instanceName, sessionId, oldTaskResource),
+        taskCurrentState);
+
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
     ResourceComputationStage stage = new ResourceComputationStage();
@@ -185,6 +194,25 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
     AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
 
+
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new WorkflowControllerDataProvider());
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+
+    resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    // +2 because it will have current state and task current state
+    AssertJUnit.assertEquals(resources.length + 2, resourceMap.size());
+
+    Resource taskResource = resourceMap.get(oldTaskResource);
+    AssertJUnit.assertNotNull(taskResource);
+    AssertJUnit.assertEquals(taskResource.getResourceName(), oldTaskResource);
+    AssertJUnit
+        .assertEquals(taskResource.getStateModelDefRef(), taskCurrentState.getStateModelDefRef());
+    AssertJUnit.assertEquals(taskResource.getPartitions().size(),
+        taskCurrentState.getPartitionStateMap().size());
+    AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_0"));
+    AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_1"));
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 2f27d4b..7bc92dc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
@@ -43,6 +44,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.slf4j.Logger;
@@ -62,6 +64,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     String clusterName = className + "_" + methodName;
     final int n = 2;
     final int r = 2;
+    final int taskResourceCount = 2;
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -78,6 +81,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     // start participants
     MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
@@ -85,6 +89,13 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 
       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
+
+      // Manually set up task current states
+      for (int j = 0; j < taskResourceCount; j++) {
+        _baseAccessor.create(keyBuilder
+            .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j)
+            .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT);
+      }
     }
 
     ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
@@ -105,7 +116,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         // System.out.println("controller watch paths: " + watchPaths);
 
         // where n is number of nodes and r is number of resources
-        return watchPaths.size() == (8 + r + ( 5 + r) * n);
+        return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
       }
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of zk-watchers.");
@@ -130,8 +141,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 14,
-        "HelixController should have 14 (8+3n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
+        "HelixController should have 16 (8+4n) callback handlers for 2 (n) participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
@@ -160,7 +171,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         // System.out.println("controller watch paths after session expiry: " + watchPaths);
 
         // where n is number of nodes and r is number of resources
-        return watchPaths.size() == (8 + r + ( 5 + r) * n);
+        // one participant is disconnected, and its task current states are removed
+        return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n - 1) + 6 + r);
       }
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of zk-watchers after session expiry.");
@@ -208,6 +220,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     String clusterName = className + "_" + methodName;
     final int n = 2;
     final int r = 1;
+    final int taskResourceCount = 1;
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -224,6 +237,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     // start participants
     MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
@@ -231,6 +245,12 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 
       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
+      // Manually set up task current states
+      for (int j = 0; j < taskResourceCount; j++) {
+        _baseAccessor.create(keyBuilder
+            .taskCurrentState(instanceName, participants[i].getSessionId(), "TestTaskResource_" + j)
+            .toString(), new ZNRecord("TestTaskResource_" + j), AccessOption.PERSISTENT);
+      }
     }
 
     ZkHelixClusterVerifier verifier =
@@ -256,8 +276,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 8 + 3 * n,
-        "HelixController should have 14 (8+3n) callback handlers for 2 participant, but was "
+    Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
+        "HelixController should have 16 (8+4n) callback handlers for 2 participant, but was "
             + controllerHandlerNb + ", " + printHandlers(controller));
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
@@ -285,7 +305,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         System.err.println("controller watch paths after session expiry: " + watchPaths.size());
 
         // where r is number of resources and n is number of nodes
-        int expected = (8 + r + (5 + r) * n);
+        // task resource count does not attribute to ideal state watch paths
+        int expected = (8 + r + (6 + r + taskResourceCount) * n);
         return watchPaths.size() == expected;
       }
     }, 2000);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 25523d0..050bc76 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -210,7 +210,7 @@ public class TestParticipantManager extends ZkTestBase {
     // check HelixCallback Monitor
     Set<ObjectInstance> objs =
         _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null);
-    Assert.assertEquals(objs.size(), 18);
+    Assert.assertEquals(objs.size(), 19);
 
     // check HelixZkClient Monitors
     objs =
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index e954c93..42633c1 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -222,6 +222,8 @@ public class MockHelixAdmin implements HelixAdmin {
     _baseDataAccessor
         .set(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), new ZNRecord(nodeId),
             0);
+    _baseDataAccessor.set(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId),
+        new ZNRecord(nodeId), 0);
     _baseDataAccessor
         .set(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), new ZNRecord(nodeId),
             0);