You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/20 18:39:53 UTC

[1/2] helix git commit: Enable our scripts to be able to pump log lines into the console.

Repository: helix
Updated Branches:
  refs/heads/master 0e8490353 -> b3ee27d4a


Enable our scripts to be able to pump log lines into the console.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b3ee27d4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b3ee27d4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b3ee27d4

Branch: refs/heads/master
Commit: b3ee27d4a663d4b586191bc1b23e9c0d90cf6a1f
Parents: ae02b58
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 8 10:51:31 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Mar 20 11:31:47 2018 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                          |  1 +
 helix-core/src/main/config/log4j.properties | 21 ++++++++++-----------
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b3ee27d4/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b94a108..b6a2539 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -189,6 +189,7 @@ under the License.
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>appassembler-maven-plugin</artifactId>
         <configuration>
+          <extraJvmArguments>-Dlog4j.configuration=file://"$BASEDIR"/conf/log4j.properties</extraJvmArguments>
           <programs>
             <program>
               <mainClass>org.apache.helix.controller.HelixControllerMain</mainClass>

http://git-wip-us.apache.org/repos/asf/helix/blob/b3ee27d4/helix-core/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-core/src/main/config/log4j.properties b/helix-core/src/main/config/log4j.properties
index 4b3dc31..5c183a1 100644
--- a/helix-core/src/main/config/log4j.properties
+++ b/helix-core/src/main/config/log4j.properties
@@ -16,16 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+# Set root logger level to INFO and its only appender to A1.
+log4j.rootLogger=INFO, console
 
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=ERROR,A1
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Target=System.out
+log4j.appender.console.threshold=INFO
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
 
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-log4j.logger.org.I0Itec=ERROR
-log4j.logger.org.apache=ERROR
+log4j.logger.org.I0Itec=INFO
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.helix=INFO


[2/2] helix git commit: Refactor ClusterDataCache, break it into small cache components, including CurrentStateCache, InstanceMessageCache and TaskDataCache, and put the refresh logic into each cache component itself.

Posted by jx...@apache.org.
Refactor ClusterDataCache, break it into small cache components, including CurrentStateCache, InstanceMessageCache and TaskDataCache, and put the refresh logic into each cache component itself.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ae02b58d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ae02b58d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ae02b58d

Branch: refs/heads/master
Commit: ae02b58dc4880fe1f63bc66e0fdf93a63fd1d03f
Parents: 0e84903
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Feb 5 17:26:24 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Mar 20 11:31:47 2018 -0700

----------------------------------------------------------------------
 .../helix/common/BasicClusterDataCache.java     | 180 ---------
 .../common/caches/BasicClusterDataCache.java    | 199 +++++++++
 .../helix/common/caches/CurrentStateCache.java  | 206 ++++++++++
 .../common/caches/InstanceMessagesCache.java    | 222 ++++++++++
 .../helix/common/caches/TaskDataCache.java      | 232 +++++++++++
 .../controller/stages/ClusterDataCache.java     | 402 +++----------------
 .../helix/spectator/RoutingDataCache.java       |   2 +-
 7 files changed, 912 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
deleted file mode 100644
index d48cfbb..0000000
--- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package org.apache.helix.common;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Cache the cluster data
- */
-public class BasicClusterDataCache {
-  protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
-
-  private Map<String, LiveInstance> _liveInstanceMap;
-  private Map<String, InstanceConfig> _instanceConfigMap;
-  private Map<String, ExternalView> _externalViewMap;
-  private final PropertyType _sourceDataType;
-
-  protected String _clusterName;
-
-  protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
-
-  public BasicClusterDataCache(String clusterName) {
-    this(clusterName, PropertyType.EXTERNALVIEW);
-  }
-
-  public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
-    _propertyDataChangedMap = new ConcurrentHashMap<>();
-    _liveInstanceMap = new HashMap<>();
-    _instanceConfigMap = new HashMap<>();
-    _externalViewMap = new HashMap<>();
-    _clusterName = clusterName;
-    _sourceDataType = sourceDataType;
-  }
-
-  /**
-   * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
-   *
-   * @param accessor
-   *
-   * @return
-   */
-  public synchronized void refresh(HelixDataAccessor accessor) {
-    LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
-    long startTime = System.currentTimeMillis();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
-      long start = System.currentTimeMillis();
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
-      switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
-          break;
-        case TARGETEXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
-          break;
-        default:
-          break;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
-            System.currentTimeMillis() - start) + " ms");
-      }
-    }
-
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
-      _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-      LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
-    }
-
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      _propertyDataChangedMap
-          .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
-      _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-      LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
-    }
-
-    long endTime = System.currentTimeMillis();
-    LOG.info(
-        "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
-            - startTime) + " ms");
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
-      for (LiveInstance instance : _liveInstanceMap.values()) {
-        LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
-      }
-      LOG.debug("ExternalViews: " + _externalViewMap.keySet());
-      LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
-    }
-  }
-
-  /**
-   * Retrieves the ExternalView for all resources
-   *
-   * @return
-   */
-  public Map<String, ExternalView> getExternalViews() {
-    return Collections.unmodifiableMap(_externalViewMap);
-  }
-
-  /**
-   * Returns the LiveInstances for each of the instances that are curretnly up and running
-   *
-   * @return
-   */
-  public Map<String, LiveInstance> getLiveInstances() {
-    return Collections.unmodifiableMap(_liveInstanceMap);
-  }
-
-  /**
-   * Returns the instance config map
-   *
-   * @return
-   */
-  public Map<String, InstanceConfig> getInstanceConfigMap() {
-    return Collections.unmodifiableMap(_instanceConfigMap);
-  }
-
-  /**
-   * Notify the cache that some part of the cluster data has been changed.
-   */
-  public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
-    _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
-  }
-
-  /**
-   * Indicate that a full read should be done on the next refresh
-   */
-  public synchronized void requireFullRefresh() {
-    for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
-      _propertyDataChangedMap.put(type, Boolean.valueOf(true));
-    }
-  }
-
-  /**
-   * toString method to print the data cache state
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
-    sb.append("externalViewMap:" + _externalViewMap).append("\n");
-    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
-
-    return sb.toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
new file mode 100644
index 0000000..f470272
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -0,0 +1,199 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews.
+ */
+public class BasicClusterDataCache {
+  protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+  private Map<String, LiveInstance> _liveInstanceMap;
+  private Map<String, InstanceConfig> _instanceConfigMap;
+  private Map<String, ExternalView> _externalViewMap;
+  private final PropertyType _sourceDataType;
+
+  protected String _clusterName;
+
+  protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
+
+  public BasicClusterDataCache(String clusterName) {
+    this(clusterName, PropertyType.EXTERNALVIEW);
+  }
+
+  public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
+    _propertyDataChangedMap = new ConcurrentHashMap<>();
+    _liveInstanceMap = new HashMap<>();
+    _instanceConfigMap = new HashMap<>();
+    _externalViewMap = new HashMap<>();
+    _clusterName = clusterName;
+    _sourceDataType = sourceDataType;
+  }
+
+  /**
+   * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+   *
+   * @param accessor
+   *
+   * @return
+   */
+  public synchronized void refresh(HelixDataAccessor accessor) {
+    LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
+    long startTime = System.currentTimeMillis();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+      long start = System.currentTimeMillis();
+      _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
+      switch (_sourceDataType) {
+        case EXTERNALVIEW:
+          _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+          break;
+        case TARGETEXTERNALVIEW:
+          _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+          break;
+        default:
+          break;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
+            System.currentTimeMillis() - start) + " ms");
+      }
+    }
+
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+      _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
+      _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
+    }
+
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+      _propertyDataChangedMap
+          .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
+      _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
+    }
+
+    long endTime = System.currentTimeMillis();
+    LOG.info(
+        "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+            - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
+      for (LiveInstance instance : _liveInstanceMap.values()) {
+        LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+      }
+      LOG.debug("ExternalViews: " + _externalViewMap.keySet());
+      LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
+    }
+  }
+
+  /**
+   * Retrieves the ExternalView for all resources
+   *
+   * @return
+   */
+  public Map<String, ExternalView> getExternalViews() {
+    return Collections.unmodifiableMap(_externalViewMap);
+  }
+
+  /**
+   * Returns the LiveInstances for each of the instances that are curretnly up and running
+   *
+   * @return
+   */
+  public Map<String, LiveInstance> getLiveInstances() {
+    return Collections.unmodifiableMap(_liveInstanceMap);
+  }
+
+  /**
+   * Returns the instance config map
+   *
+   * @return
+   */
+  public Map<String, InstanceConfig> getInstanceConfigMap() {
+    return Collections.unmodifiableMap(_instanceConfigMap);
+  }
+
+  /**
+   * Notify the cache that some part of the cluster data has been changed.
+   */
+  public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
+    _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
+  }
+
+  /**
+   * Clear the corresponding cache based on change type
+   * @param changeType
+   */
+  public synchronized void clearCache(HelixConstants.ChangeType changeType) {
+    switch (changeType) {
+    case LIVE_INSTANCE:
+      _liveInstanceMap.clear();
+      break;
+    case INSTANCE_CONFIG:
+      _instanceConfigMap.clear();
+      break;
+    case EXTERNAL_VIEW:
+      _externalViewMap.clear();
+      break;
+    default:
+      break;
+    }
+  }
+
+  /**
+   * Indicate that a full read should be done on the next refresh
+   */
+  public synchronized void requireFullRefresh() {
+    for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
+      _propertyDataChangedMap.put(type, Boolean.valueOf(true));
+    }
+  }
+
+  /**
+   * toString method to print the data cache state
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
+    sb.append("externalViewMap:" + _externalViewMap).append("\n");
+    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+
+    return sb.toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
new file mode 100644
index 0000000..d921512
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -0,0 +1,206 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache to hold all CurrentStates of a cluster.
+ */
+public class CurrentStateCache {
+  private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName());
+
+  private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
+  private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap();
+
+  private String _clusterName;
+
+  public CurrentStateCache(String clusterName) {
+    _clusterName = clusterName;
+    _currentStateMap = Collections.emptyMap();
+  }
+
+  /**
+   * This refreshes the CurrentStates data by re-fetching the data from zookeeper in an efficient
+   * way
+   *
+   * @param accessor
+   * @param liveInstanceMap map of all liveInstances in cluster
+   *
+   * @return
+   */
+  public synchronized boolean refresh(HelixDataAccessor accessor,
+      Map<String, LiveInstance> liveInstanceMap) {
+    LOG.info("START: CurrentStateCache.refresh()");
+    long startTime = System.currentTimeMillis();
+
+    refreshCurrentStatesCache(accessor, liveInstanceMap);
+
+    Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>();
+    for (PropertyKey key : _currentStateCache.keySet()) {
+      CurrentState currentState = _currentStateCache.get(key);
+      String[] params = key.getParams();
+      if (currentState != null && params.length >= 4) {
+        String instanceName = params[1];
+        String sessionId = params[2];
+        String stateName = params[3];
+        Map<String, Map<String, CurrentState>> instanceCurStateMap =
+            allCurStateMap.get(instanceName);
+        if (instanceCurStateMap == null) {
+          instanceCurStateMap = Maps.newHashMap();
+          allCurStateMap.put(instanceName, instanceCurStateMap);
+        }
+        Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+        if (sessionCurStateMap == null) {
+          sessionCurStateMap = Maps.newHashMap();
+          instanceCurStateMap.put(sessionId, sessionCurStateMap);
+        }
+        sessionCurStateMap.put(stateName, currentState);
+      }
+    }
+
+    for (String instance : allCurStateMap.keySet()) {
+      allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance)));
+    }
+    _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: CurrentStateCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+        - startTime) + " ms");
+    return true;
+  }
+
+  // reload current states that has been changed from zk to local cache.
+  private void refreshCurrentStatesCache(HelixDataAccessor accessor,
+      Map<String, LiveInstance> liveInstanceMap) {
+    long start = System.currentTimeMillis();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    List<PropertyKey> currentStateKeys = Lists.newLinkedList();
+    for (String instanceName : liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+      String sessionId = liveInstance.getSessionId();
+      List<String> currentStateNames =
+          accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+      for (String currentStateName : currentStateNames) {
+        currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+      }
+    }
+
+    // All new entries from zk not cached locally yet should be read from ZK.
+    List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys);
+    reloadKeys.removeAll(_currentStateCache.keySet());
+
+    List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet());
+    cachedKeys.retainAll(currentStateKeys);
+
+    List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
+    Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap();
+    for (int i = 0; i < cachedKeys.size(); i++) {
+      PropertyKey key = cachedKeys.get(i);
+      HelixProperty.Stat stat = stats.get(i);
+      if (stat != null) {
+        CurrentState property = _currentStateCache.get(key);
+        if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) {
+          currentStatesMap.put(key, property);
+        } else {
+          // need update from zk
+          reloadKeys.add(key);
+        }
+      } else {
+        LOG.debug("stat is null for key: " + key);
+        reloadKeys.add(key);
+      }
+    }
+
+    List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
+    Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
+    for (CurrentState currentState : currentStates) {
+      PropertyKey key = csKeyIter.next();
+      if (currentState != null) {
+        currentStatesMap.put(key, currentState);
+      } else {
+        LOG.debug("CurrentState null for key: " + key);
+      }
+    }
+
+    _currentStateCache = Collections.unmodifiableMap(currentStatesMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size());
+      LOG.debug("# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size()
+          - reloadKeys.size()));
+    }
+    LOG.info("Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!");
+  }
+
+  /**
+   * Return CurrentStates map for all instances.
+   *
+   * @return
+   */
+  public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
+    return Collections.unmodifiableMap(_currentStateMap);
+  }
+
+  /**
+   * Return all CurrentState on the given instance.
+   *
+   * @param instance
+   *
+   * @return
+   */
+  public Map<String, Map<String, CurrentState>> getCurrentStates(String instance) {
+    if (!_currentStateMap.containsKey(instance)) {
+      return Collections.emptyMap();
+    }
+    return Collections.unmodifiableMap(_currentStateMap.get(instance));
+  }
+
+  /**
+   * Provides the current state of the node for a given session id, the sessionid can be got from
+   * LiveInstance
+   *
+   * @param instance
+   * @param clientSessionId
+   *
+   * @return
+   */
+  public Map<String, CurrentState> getCurrentState(String instance, String clientSessionId) {
+    if (!_currentStateMap.containsKey(instance) || !_currentStateMap.get(instance)
+        .containsKey(clientSessionId)) {
+      return Collections.emptyMap();
+    }
+    return Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
new file mode 100644
index 0000000..9ac40c3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -0,0 +1,222 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache for holding pending messages in all instances in the given cluster.
+ */
+public class InstanceMessagesCache {
+  private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
+  private Map<String, Map<String, Message>> _messageMap;
+
+  // maintain a cache of participant messages across pipeline runs
+  private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+  private String _clusterName;
+
+  public InstanceMessagesCache(String clusterName) {
+    _clusterName = clusterName;
+  }
+
+  /**
+   * This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an
+   * efficient way
+   * current state must be refreshed before refreshing relay messages because we need to use current
+   * state to validate all relay messages.
+   *
+   * @param accessor
+   * @param liveInstanceMap
+   *
+   * @return
+   */
+  public synchronized boolean refresh(HelixDataAccessor accessor,
+      Map<String, LiveInstance> liveInstanceMap) {
+    LOG.info("START: InstanceMessagesCache.refresh()");
+    long startTime = System.currentTimeMillis();
+
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, Map<String, Message>> msgMap = new HashMap<>();
+    List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+    long purgeSum = 0;
+    for (String instanceName : liveInstanceMap.keySet()) {
+      // get the cache
+      Map<String, Message> cachedMap = _messageCache.get(instanceName);
+      if (cachedMap == null) {
+        cachedMap = Maps.newHashMap();
+        _messageCache.put(instanceName, cachedMap);
+      }
+      msgMap.put(instanceName, cachedMap);
+
+      // get the current names
+      Set<String> messageNames =
+          Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+      long purgeStart = System.currentTimeMillis();
+      // clear stale names
+      Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+      while (cachedNamesIter.hasNext()) {
+        String messageName = cachedNamesIter.next();
+        if (!messageNames.contains(messageName)) {
+          cachedNamesIter.remove();
+        }
+      }
+      long purgeEnd = System.currentTimeMillis();
+      purgeSum += purgeEnd - purgeStart;
+
+      // get the keys for the new messages
+      for (String messageName : messageNames) {
+        if (!cachedMap.containsKey(messageName)) {
+          newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+        }
+      }
+    }
+
+    // get the new messages
+    if (newMessageKeys.size() > 0) {
+      List<Message> newMessages = accessor.getProperty(newMessageKeys);
+      for (Message message : newMessages) {
+        if (message != null) {
+          Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+          cachedMap.put(message.getId(), message);
+        }
+      }
+    }
+
+    _messageMap = Collections.unmodifiableMap(msgMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Message purge took: " + purgeSum);
+      LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + (
+          System.currentTimeMillis() - startTime) + " ms.");
+    }
+
+    return true;
+  }
+
+  // update all valid relay messages attached to existing state transition messages into message map.
+  public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+    List<Message> relayMessages = new ArrayList<>();
+    for (String instance : _messageMap.keySet()) {
+      Map<String, Message> instanceMessages = _messageMap.get(instance);
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null) {
+        continue;
+      }
+
+      for (Message message : instanceMessages.values()) {
+        if (message.hasRelayMessages()) {
+          String sessionId = message.getTgtSessionId();
+          String resourceName = message.getResourceName();
+          String partitionName = message.getPartitionName();
+          String targetState = message.getToState();
+          String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+          if (!instanceSessionId.equals(sessionId)) {
+            continue;
+          }
+
+          Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+          if (sessionCurrentStateMap == null) {
+            continue;
+          }
+          CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+          if (currentState == null || !targetState.equals(currentState.getState(partitionName))) {
+            continue;
+          }
+          long transitionCompleteTime = currentState.getEndTime(partitionName);
+
+          for (Message msg : message.getRelayMessages().values()) {
+            msg.setRelayTime(transitionCompleteTime);
+            if (!message.isExpired()) {
+              relayMessages.add(msg);
+            }
+          }
+        }
+      }
+    }
+
+    for (Message message : relayMessages) {
+      String instance = message.getTgtName();
+      Map<String, Message> instanceMessages = _messageMap.get(instance);
+      if (instanceMessages == null) {
+        instanceMessages = new HashMap<>();
+        _messageMap.put(instance, instanceMessages);
+      }
+      instanceMessages.put(message.getId(), message);
+    }
+  }
+
+  /**
+   * Provides a list of current outstanding transitions on a given instance.
+   *
+   * @param instanceName
+   *
+   * @return
+   */
+  public Map<String, Message> getMessages(String instanceName) {
+    Map<String, Message> map = _messageMap.get(instanceName);
+    if (map != null) {
+      return map;
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void cacheMessages(List<Message> messages) {
+    for (Message message : messages) {
+      String instanceName = message.getTgtName();
+      Map<String, Message> instMsgMap;
+      if (_messageCache.containsKey(instanceName)) {
+        instMsgMap = _messageCache.get(instanceName);
+      } else {
+        instMsgMap = Maps.newHashMap();
+        _messageCache.put(instanceName, instMsgMap);
+      }
+      instMsgMap.put(message.getId(), message);
+    }
+  }
+
+  @Override public String toString() {
+    return "InstanceMessagesCache{" +
+        "_messageMap=" + _messageMap +
+        ", _messageCache=" + _messageCache +
+        ", _clusterName='" + _clusterName + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
new file mode 100644
index 0000000..2dbb4f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -0,0 +1,232 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache for holding all task related cluster data, such as WorkflowConfig, JobConfig and Contexts.
+ */
+public class TaskDataCache {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskDataCache.class.getName());
+  private static final String NAME = "NAME";
+
+  private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
+  private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
+  private Map<String, ZNRecord> _contextMap = new HashMap<>();
+
+  private String _clusterName;
+
+  public TaskDataCache(String clusterName) {
+    _clusterName = clusterName;
+  }
+
+  /**
+   * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+   *
+   * @param accessor
+   *
+   * @return
+   */
+  public synchronized boolean refresh(HelixDataAccessor accessor,
+      Map<String, ResourceConfig> resourceConfigMap) {
+    refreshJobContexts(accessor);
+
+    // update workflow and job configs.
+    _workflowConfigMap.clear();
+    _jobConfigMap.clear();
+    for (Map.Entry<String, ResourceConfig> entry : resourceConfigMap.entrySet()) {
+      if (entry.getValue().getRecord().getSimpleFields()
+          .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
+        _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue()));
+      } else if (entry.getValue().getRecord().getSimpleFields()
+          .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
+        _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
+      }
+    }
+
+    return true;
+  }
+
+  private void refreshJobContexts(HelixDataAccessor accessor) {
+    // TODO: Need an optimize for reading context only if the refresh is needed.
+    long start = System.currentTimeMillis();
+    _contextMap.clear();
+    if (_clusterName == null) {
+      return;
+    }
+    String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
+        TaskConstants.REBALANCER_CONTEXT_ROOT);
+    List<String> contextPaths = new ArrayList<>();
+    List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
+    if (childNames == null) {
+      return;
+    }
+    for (String context : childNames) {
+      contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE));
+    }
+
+    List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0);
+    for (int i = 0; i < contexts.size(); i++) {
+      ZNRecord context = contexts.get(i);
+      if (context != null && context.getSimpleField(NAME) != null) {
+        _contextMap.put(context.getSimpleField(NAME), context);
+      } else {
+        _contextMap.put(childNames.get(i), context);
+        LOG.debug(
+            String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
+          System.currentTimeMillis() - start) + " ms");
+    }
+  }
+
+  /**
+   * Returns job config map
+   *
+   * @return
+   */
+  public Map<String, JobConfig> getJobConfigMap() {
+    return _jobConfigMap;
+  }
+
+  /**
+   * Returns job config
+   *
+   * @param resource
+   *
+   * @return
+   */
+  public JobConfig getJobConfig(String resource) {
+    return _jobConfigMap.get(resource);
+  }
+
+  /**
+   * Returns workflow config map
+   *
+   * @return
+   */
+  public Map<String, WorkflowConfig> getWorkflowConfigMap() {
+    return _workflowConfigMap;
+  }
+
+  /**
+   * Returns workflow config
+   *
+   * @param resource
+   *
+   * @return
+   */
+  public WorkflowConfig getWorkflowConfig(String resource) {
+    return _workflowConfigMap.get(resource);
+  }
+
+  /**
+   * Return the JobContext by resource name
+   *
+   * @param resourceName
+   *
+   * @return
+   */
+  public JobContext getJobContext(String resourceName) {
+    if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
+      return new JobContext(_contextMap.get(resourceName));
+    }
+    return null;
+  }
+
+  /**
+   * Return the WorkflowContext by resource name
+   *
+   * @param resourceName
+   *
+   * @return
+   */
+  public WorkflowContext getWorkflowContext(String resourceName) {
+    if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
+      return new WorkflowContext(_contextMap.get(resourceName));
+    }
+    return null;
+  }
+
+  /**
+   * Update context of the Job
+   */
+  public void updateJobContext(String resourceName, JobContext jobContext,
+      HelixDataAccessor accessor) {
+    updateContext(resourceName, jobContext.getRecord(), accessor);
+  }
+
+  /**
+   * Update context of the Workflow
+   */
+  public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
+      HelixDataAccessor accessor) {
+    updateContext(resourceName, workflowContext.getRecord(), accessor);
+  }
+
+  /**
+   * Update context of the Workflow or Job
+   */
+  private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) {
+    String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
+        TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
+    accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT);
+    _contextMap.put(resourceName, record);
+  }
+
+  /**
+   * Return map of WorkflowContexts or JobContexts
+   *
+   * @return
+   */
+  public Map<String, ZNRecord> getContexts() {
+    return _contextMap;
+  }
+
+  @Override public String toString() {
+    return "TaskDataCache{" +
+        "_jobConfigMap=" + _jobConfigMap +
+        ", _workflowConfigMap=" + _workflowConfigMap +
+        ", _contextMap=" + _contextMap +
+        ", _clusterName='" + _clusterName + '\'' +
+        '}';
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 47413fc..4fa0c8c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,24 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.CurrentStateCache;
+import org.apache.helix.common.caches.InstanceMessagesCache;
+import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -60,11 +59,6 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import static org.apache.helix.HelixConstants.ChangeType;
 
 /**
@@ -87,26 +81,21 @@ public class ClusterDataCache {
   private Map<String, ResourceConfig> _resourceConfigMap;
   private Map<String, ResourceConfig> _resourceConfigCacheMap;
   private Map<String, ClusterConstraints> _constraintMap;
-  private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
-  private Map<String, Map<String, Message>> _messageMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
-  private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
-  private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
-  private Map<String, ZNRecord> _contextMap = new HashMap<>();
-  private Map<String, ExternalView> _targetExternalViewMap = Maps.newHashMap();
+  private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
 
-  // maintain a cache of participant messages across pipeline runs
-  private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
-  private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap();
+  private CurrentStateCache _currentStateCache;
+  private TaskDataCache _taskDataCache;
+  private InstanceMessagesCache _instanceMessagesCache;
 
   // maintain a cache of bestPossible assignment across pipeline runs
   // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
-  private Map<String, ResourceAssignment>  _resourceAssignmentCache = Maps.newHashMap();
+  private Map<String, ResourceAssignment>  _resourceAssignmentCache = new HashMap<>();
 
 
   // maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs
-  private Map<String, ZNRecord>  _idealMappingCache = Maps.newHashMap();
+  private Map<String, ZNRecord>  _idealMappingCache = new HashMap<>();
 
   private Map<ChangeType, Boolean> _propertyDataChangedMap;
 
@@ -122,15 +111,18 @@ public class ClusterDataCache {
   private String _clusterName;
 
   public ClusterDataCache () {
+    this(null);
+  }
+
+  public ClusterDataCache(String clusterName) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
-    for(ChangeType type : ChangeType.values()) {
+    for (ChangeType type : ChangeType.values()) {
       _propertyDataChangedMap.put(type, Boolean.valueOf(true));
     }
-  }
-
-  public ClusterDataCache (String clusterName) {
-    this();
     _clusterName = clusterName;
+    _currentStateCache = new CurrentStateCache(_clusterName);
+    _taskDataCache = new TaskDataCache(_clusterName);
+    _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
   /**
@@ -177,18 +169,17 @@ public class ClusterDataCache {
       LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
     }
 
-    _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
-    _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+    _idealStateMap = new HashMap<>(_idealStateCacheMap);
+    _liveInstanceMap = new HashMap(_liveInstanceCacheMap);
     _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
-    _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+    _resourceConfigMap = new HashMap(_resourceConfigCacheMap);
 
     if (_updateInstanceOfflineTime) {
       updateOfflineInstanceHistory(accessor);
     }
 
     if (_isTaskCache) {
-      refreshJobContexts(accessor);
-      updateWorkflowJobConfigs();
+      _taskDataCache.refresh(accessor, _resourceConfigMap);
     }
 
     Map<String, StateModelDefinition> stateDefMap =
@@ -197,17 +188,20 @@ public class ClusterDataCache {
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
     _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
-    refreshMessages(accessor);
-    refreshCurrentStates(accessor);
 
+    _instanceMessagesCache
+        .refresh(accessor, _liveInstanceMap);
+    _currentStateCache.refresh(accessor, _liveInstanceMap);
+    
     // current state must be refreshed before refreshing relay messages
     // because we need to use current state to validate all relay messages.
-    updateRelayMessages(_messageMap);
+    _instanceMessagesCache
+        .updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap());
 
     if (_clusterConfig != null) {
       _idealStateRuleMap = _clusterConfig.getIdealStateRules();
     } else {
-      _idealStateRuleMap = Maps.newHashMap();
+      _idealStateRuleMap = new HashMap();
       LOG.warn("Cluster config is null!");
     }
 
@@ -229,7 +223,7 @@ public class ClusterDataCache {
       LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet());
       LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
       LOG.debug("ClusterConfigs: " + _clusterConfig);
-      LOG.debug("JobContexts: " + _contextMap.keySet());
+      LOG.debug("JobContexts: " + _taskDataCache.getContexts().keySet());
     }
 
     if (LOG.isTraceEnabled()) {
@@ -239,215 +233,6 @@ public class ClusterDataCache {
     return true;
   }
 
-  private void refreshMessages(HelixDataAccessor accessor) {
-    long start = System.currentTimeMillis();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    Map<String, Map<String, Message>> msgMap = new HashMap<>();
-    List<PropertyKey> newMessageKeys = Lists.newLinkedList();
-    long purgeSum = 0;
-    for (String instanceName : _liveInstanceMap.keySet()) {
-      // get the cache
-      Map<String, Message> cachedMap = _messageCache.get(instanceName);
-      if (cachedMap == null) {
-        cachedMap = Maps.newHashMap();
-        _messageCache.put(instanceName, cachedMap);
-      }
-      msgMap.put(instanceName, cachedMap);
-
-      // get the current names
-      Set<String> messageNames =
-          Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
-
-      long purgeStart = System.currentTimeMillis();
-      // clear stale names
-      Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
-      while (cachedNamesIter.hasNext()) {
-        String messageName = cachedNamesIter.next();
-        if (!messageNames.contains(messageName)) {
-          cachedNamesIter.remove();
-        }
-      }
-      long purgeEnd = System.currentTimeMillis();
-      purgeSum += purgeEnd - purgeStart;
-
-      // get the keys for the new messages
-      for (String messageName : messageNames) {
-        if (!cachedMap.containsKey(messageName)) {
-          newMessageKeys.add(keyBuilder.message(instanceName, messageName));
-        }
-      }
-    }
-
-    // get the new messages
-    if (newMessageKeys.size() > 0) {
-      List<Message> newMessages = accessor.getProperty(newMessageKeys);
-      for (Message message : newMessages) {
-        if (message != null) {
-          Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
-          cachedMap.put(message.getId(), message);
-        }
-      }
-    }
-
-    _messageMap = Collections.unmodifiableMap(msgMap);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Message purge took: " + purgeSum);
-      LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + (
-          System.currentTimeMillis() - start) + " ms.");
-    }
-  }
-
-  // update all valid relay messages attached to existing state transition messages into message map.
-  private void updateRelayMessages(Map<String, Map<String, Message>> messageMap) {
-    List<Message> relayMessages = new ArrayList<>();
-    for (String instance : messageMap.keySet()) {
-      Map<String, Message> instanceMessages = messageMap.get(instance);
-      Map<String, Map<String, CurrentState>> instanceCurrentStateMap = _currentStateMap.get(instance);
-      if (instanceCurrentStateMap == null) {
-        continue;
-      }
-
-      for (Message message : instanceMessages.values()) {
-        if (message.hasRelayMessages()) {
-          String sessionId = message.getTgtSessionId();
-          String resourceName = message.getResourceName();
-          String partitionName = message.getPartitionName();
-          String targetState = message.getToState();
-          String instanceSessionId = _liveInstanceMap.get(instance).getSessionId();
-
-          if (!instanceSessionId.equals(sessionId)) {
-            continue;
-          }
-
-          Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
-          if (sessionCurrentStateMap == null) {
-            continue;
-          }
-          CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-          if (currentState == null || !targetState.equals(currentState.getState(partitionName))) {
-            continue;
-          }
-          long transitionCompleteTime = currentState.getEndTime(partitionName);
-
-          for (Message msg : message.getRelayMessages().values()) {
-            msg.setRelayTime(transitionCompleteTime);
-            if (!message.isExpired()) {
-              relayMessages.add(msg);
-            }
-          }
-        }
-      }
-    }
-
-    for (Message message : relayMessages) {
-      String instance = message.getTgtName();
-      Map<String, Message> instanceMessages = messageMap.get(instance);
-      if (instanceMessages == null) {
-        instanceMessages = new HashMap<>();
-        messageMap.put(instance, instanceMessages);
-      }
-      instanceMessages.put(message.getId(), message);
-    }
-  }
-
-  private void refreshCurrentStates(HelixDataAccessor accessor) {
-    refreshCurrentStatesCache(accessor);
-
-    Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>();
-    for (PropertyKey key : _currentStateCache.keySet()) {
-      CurrentState currentState = _currentStateCache.get(key);
-      String[] params = key.getParams();
-      if (currentState != null && params.length >= 4) {
-        String instanceName = params[1];
-        String sessionId = params[2];
-        String stateName = params[3];
-        Map<String, Map<String, CurrentState>> instanceCurStateMap =
-            allCurStateMap.get(instanceName);
-        if (instanceCurStateMap == null) {
-          instanceCurStateMap = Maps.newHashMap();
-          allCurStateMap.put(instanceName, instanceCurStateMap);
-        }
-        Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
-        if (sessionCurStateMap == null) {
-          sessionCurStateMap = Maps.newHashMap();
-          instanceCurStateMap.put(sessionId, sessionCurStateMap);
-        }
-        sessionCurStateMap.put(stateName, currentState);
-      }
-    }
-
-    for (String instance : allCurStateMap.keySet()) {
-      allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance)));
-    }
-    _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
-  }
-
-  // reload current states that has been changed from zk to local cache.
-  private void refreshCurrentStatesCache(HelixDataAccessor accessor) {
-    long start = System.currentTimeMillis();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    List<PropertyKey> currentStateKeys = Lists.newLinkedList();
-    for (String instanceName : _liveInstanceMap.keySet()) {
-      LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
-      String sessionId = liveInstance.getSessionId();
-      List<String> currentStateNames =
-          accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
-      for (String currentStateName : currentStateNames) {
-        currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
-      }
-    }
-
-    // All new entries from zk not cached locally yet should be read from ZK.
-    List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys);
-    reloadKeys.removeAll(_currentStateCache.keySet());
-
-    List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet());
-    cachedKeys.retainAll(currentStateKeys);
-
-    List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
-    Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap();
-    for (int i=0; i < cachedKeys.size(); i++) {
-      PropertyKey key = cachedKeys.get(i);
-      HelixProperty.Stat stat = stats.get(i);
-      if (stat != null) {
-        CurrentState property = _currentStateCache.get(key);
-        if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) {
-          currentStatesMap.put(key, property);
-        } else {
-          // need update from zk
-          reloadKeys.add(key);
-        }
-      } else {
-        LOG.debug("stat is null for key: " + key);
-        reloadKeys.add(key);
-      }
-    }
-
-    List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
-    Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
-    for (CurrentState currentState : currentStates) {
-      PropertyKey key = csKeyIter.next();
-      if (currentState != null) {
-        currentStatesMap.put(key, currentState);
-      } else {
-        LOG.debug("CurrentState null for key: " + key);
-      }
-    }
-
-    _currentStateCache = Collections.unmodifiableMap(currentStatesMap);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size());
-      LOG.debug(
-          "# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size() - reloadKeys.size()));
-    }
-    LOG.info(
-        "Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!");
-  }
-
   private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
     List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet());
     offlineNodes.removeAll(_liveInstanceMap.keySet());
@@ -502,7 +287,7 @@ public class ClusterDataCache {
   }
 
   public synchronized void setIdealStates(List<IdealState> idealStates) {
-    Map<String, IdealState> idealStateMap = Maps.newHashMap();
+    Map<String, IdealState> idealStateMap = new HashMap();
     for (IdealState idealState : idealStates) {
       idealStateMap.put(idealState.getId(), idealState);
     }
@@ -588,7 +373,7 @@ public class ClusterDataCache {
 
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+    Map<String, LiveInstance> liveInstanceMap = new HashMap();
     for (LiveInstance liveInstance : liveInstances) {
       liveInstanceMap.put(liveInstance.getId(), liveInstance);
     }
@@ -597,18 +382,16 @@ public class ClusterDataCache {
   }
 
   /**
-   * Provides the current state of the node for a given session id,
-   * the sessionid can be got from LiveInstance
+   * Provides the current state of the node for a given session id, the sessionid can be got from
+   * LiveInstance
+   *
    * @param instanceName
    * @param clientSessionId
+   *
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
-    if (!_currentStateMap.containsKey(instanceName)
-        || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
-      return Collections.emptyMap();
-    }
-    return _currentStateMap.get(instanceName).get(clientSessionId);
+    return _currentStateCache.getCurrentState(instanceName, clientSessionId);
   }
 
   /**
@@ -617,26 +400,11 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, Message> getMessages(String instanceName) {
-    Map<String, Message> map = _messageMap.get(instanceName);
-    if (map != null) {
-      return map;
-    } else {
-      return Collections.emptyMap();
-    }
+    return _instanceMessagesCache.getMessages(instanceName);
   }
 
   public void cacheMessages(List<Message> messages) {
-    for (Message message : messages) {
-      String instanceName = message.getTgtName();
-      Map<String, Message> instMsgMap;
-      if (_messageCache.containsKey(instanceName)) {
-        instMsgMap = _messageCache.get(instanceName);
-      } else {
-        instMsgMap = Maps.newHashMap();
-        _messageCache.put(instanceName, instMsgMap);
-      }
-      instMsgMap.put(message.getId(), message);
-    }
+    _instanceMessagesCache.cacheMessages(messages);
   }
 
   /**
@@ -721,7 +489,7 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, JobConfig> getJobConfigMap() {
-    return _jobConfigMap;
+    return _taskDataCache.getJobConfigMap();
   }
 
   /**
@@ -730,7 +498,7 @@ public class ClusterDataCache {
    * @return
    */
   public JobConfig getJobConfig(String resource) {
-    return _jobConfigMap.get(resource);
+    return _taskDataCache.getJobConfig(resource);
   }
 
   /**
@@ -738,7 +506,7 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, WorkflowConfig> getWorkflowConfigMap() {
-    return _workflowConfigMap;
+    return _taskDataCache.getWorkflowConfigMap();
   }
 
   /**
@@ -747,12 +515,12 @@ public class ClusterDataCache {
    * @return
    */
   public WorkflowConfig getWorkflowConfig(String resource) {
-    return _workflowConfigMap.get(resource);
+    return _taskDataCache.getWorkflowConfig(resource);
   }
 
 
   public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
-    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
     }
@@ -881,10 +649,7 @@ public class ClusterDataCache {
    * @return
    */
   public JobContext getJobContext(String resourceName) {
-    if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
-      return new JobContext(_contextMap.get(resourceName));
-    }
-    return null;
+    return _taskDataCache.getJobContext(resourceName);
   }
 
   /**
@@ -893,10 +658,7 @@ public class ClusterDataCache {
    * @return
    */
   public WorkflowContext getWorkflowContext(String resourceName) {
-    if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
-      return new WorkflowContext(_contextMap.get(resourceName));
-    }
-    return null;
+    return _taskDataCache.getWorkflowContext(resourceName);
   }
 
   /**
@@ -904,7 +666,7 @@ public class ClusterDataCache {
    */
   public void updateJobContext(String resourceName, JobContext jobContext,
       HelixDataAccessor accessor) {
-    updateContext(resourceName, jobContext.getRecord(), accessor);
+    _taskDataCache.updateJobContext(resourceName, jobContext, accessor);
   }
 
   /**
@@ -912,17 +674,7 @@ public class ClusterDataCache {
    */
   public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
       HelixDataAccessor accessor) {
-    updateContext(resourceName, workflowContext.getRecord(), accessor);
-  }
-
-  /**
-   * Update context of the Workflow or Job
-   */
-  private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) {
-    String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
-        TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
-    accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT);
-    _contextMap.put(resourceName, record);
+    _taskDataCache.updateWorkflowContext(resourceName, workflowContext, accessor);
   }
 
   /**
@@ -930,7 +682,7 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, ZNRecord> getContexts() {
-    return _contextMap;
+    return _taskDataCache.getContexts();
   }
 
   public ExternalView getTargetExternalView(String resourceName) {
@@ -1066,61 +818,11 @@ public class ClusterDataCache {
     sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
     sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
     sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n");
-    sb.append("jobContextMap:" + _contextMap).append("\n");
-    sb.append("messageMap:" + _messageMap).append("\n");
-    sb.append("currentStateMap:" + _currentStateMap).append("\n");
+    sb.append("taskDataCache:" + _taskDataCache).append("\n");
+    sb.append("messageCache:" + _instanceMessagesCache).append("\n");
+    sb.append("currentStateCache:" + _currentStateCache).append("\n");
     sb.append("clusterConfig:" + _clusterConfig).append("\n");
 
     return sb.toString();
   }
-
-  private void refreshJobContexts(HelixDataAccessor accessor) {
-    // TODO: Need an optimize for reading context only if the refresh is needed.
-    long start = System.currentTimeMillis();
-    _contextMap.clear();
-    if (_clusterName == null) {
-      return;
-    }
-    String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
-        TaskConstants.REBALANCER_CONTEXT_ROOT);
-    List<String> contextPaths = new ArrayList<>();
-    List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
-    if (childNames == null) {
-      return;
-    }
-    for (String context : childNames) {
-      contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE));
-    }
-
-    List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0);
-    for (int i = 0; i < contexts.size(); i++) {
-      ZNRecord context = contexts.get(i);
-      if (context != null && context.getSimpleField(NAME) != null) {
-        _contextMap.put(context.getSimpleField(NAME), context);
-      } else {
-        _contextMap.put(childNames.get(i), context);
-        LOG.debug(
-            String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
-          System.currentTimeMillis() - start) + " ms");
-    }
-  }
-
-  private void updateWorkflowJobConfigs() {
-    _workflowConfigMap.clear();
-    _jobConfigMap.clear();
-    for (Map.Entry<String, ResourceConfig> entry : _resourceConfigMap.entrySet()) {
-      if (entry.getValue().getRecord().getSimpleFields()
-          .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
-        _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue()));
-      } else if (entry.getValue().getRecord().getSimpleFields()
-          .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
-        _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 5602333..332cd8a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -21,7 +21,7 @@ package org.apache.helix.spectator;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyType;
-import org.apache.helix.common.BasicClusterDataCache;
+import org.apache.helix.common.caches.BasicClusterDataCache;
 
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.