You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 01:16:00 UTC

[1/4] helix git commit: Refactor WorkflowRebalancer to WorkflowHandler

Repository: helix
Updated Branches:
  refs/heads/master cf010f904 -> 9d7364d7a


Refactor WorkflowRebalancer to WorkflowHandler

Current WorkflowRebalancer is a little bit messing that mixing workflow update and scheduling logic together. Refactor WorklfowRebalancer to WorkflowHandler which will schedule and update the status of the workflow independent from each other.

Also remove the redundant logics in existing pipeline.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0c3ac37b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0c3ac37b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0c3ac37b

Branch: refs/heads/master
Commit: 0c3ac37b0b442f20d08eaba86da7d94ec1494d1f
Parents: cf010f9
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Sep 12 18:31:38 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 18:05:06 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      |  27 +-
 .../helix/task/AbstractTaskDispatcher.java      | 129 ++++-
 .../org/apache/helix/task/JobRebalancer.java    |   2 +-
 .../org/apache/helix/task/TaskRebalancer.java   | 120 -----
 .../apache/helix/task/WorkflowDispatcher.java   | 526 +++++++++++++++++++
 .../apache/helix/task/WorkflowRebalancer.java   | 510 +-----------------
 .../scripts/integration-test/script/pexpect.py  |  12 +-
 .../task/TestIndependentTaskRebalancer.java     |   2 +-
 .../integration/task/TestRecurringJobQueue.java |  26 +-
 9 files changed, 701 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 5d39512..2c42aca 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -214,14 +214,14 @@ public class TaskDataCache extends AbstractDataCache {
     _contextToUpdate.removeAll(_contextToRemove);
     List<String> contextUpdateNames = new ArrayList<>(_contextToUpdate);
     for (String resourceName : contextUpdateNames) {
-      if (_contextMap.get(resourceName) != null && !_contextToRemove.contains(resourceName)) {
+      if (_contextMap.get(resourceName) != null) {
         contextUpdatePaths.add(getContextPath(resourceName));
         contextUpdateData.add(_contextMap.get(resourceName));
       }
     }
 
-    boolean[] updateSuccess = accessor.getBaseDataAccessor()
-        .setChildren(contextUpdatePaths, contextUpdateData, AccessOption.PERSISTENT);
+    boolean[] updateSuccess =
+        accessor.getBaseDataAccessor().setChildren(contextUpdatePaths, contextUpdateData, AccessOption.PERSISTENT);
 
     for (int i = 0; i < updateSuccess.length; i++) {
       if (updateSuccess[i]) {
@@ -230,18 +230,21 @@ public class TaskDataCache extends AbstractDataCache {
     }
 
     // Delete contexts
-    List<String> contextToRemove = new ArrayList<>();
-    List<String> contextToRemoveNames = new ArrayList<>(_contextToRemove);
-    for (String resourceName : contextToRemoveNames) {
-      contextToRemove.add(getContextPath(resourceName));
+    // We can not leave the context here since some of the deletion happens for cleaning workflow
+    // If we leave it in the memory, Helix will not allow user create it with same name.
+    // TODO: Let's have periodical clean up thread that could remove deletion failed contexts.
+    List<String> contextPathsToRemove = new ArrayList<>();
+    List<String> contextNamesToRemove = new ArrayList<>(_contextToRemove);
+    for (String resourceName : contextNamesToRemove) {
+      contextPathsToRemove.add(getContextPath(resourceName));
     }
 
+    // TODO: current behavior is when you delete non-existing data will return false.
+    // Once the behavior fixed, we can add retry logic back. Otherwise, it will stay in memory and
+    // not allow same workflow name recreation.
+    accessor.getBaseDataAccessor().remove(contextPathsToRemove, AccessOption.PERSISTENT);
 
-    // Current implementation is stateless operation, since Helix read all the contexts back
-    // and redo the works. If it is failed to remove this round, it could be removed in next round.
-
-    // Also if the context has already been removed, it should be fine.
-    accessor.getBaseDataAccessor().remove(contextToRemove, AccessOption.PERSISTENT);
+    _contextToRemove.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index bf33d77..617263b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -1,15 +1,19 @@
 package org.apache.helix.task;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
+import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -412,7 +416,7 @@ public abstract class AbstractTaskDispatcher {
       WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap,
       ClusterDataCache clusterDataCache) {
     markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap,
-        clusterDataCache);
+        clusterDataCache.getTaskDataCache());
     // Mark all INIT task to TASK_ABORTED
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
@@ -752,7 +756,7 @@ public abstract class AbstractTaskDispatcher {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.COMPLETED);
     jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache)) {
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache.getTaskDataCache())) {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
@@ -761,7 +765,7 @@ public abstract class AbstractTaskDispatcher {
 
   protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
+      TaskDataCache clusterDataCache) {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.FAILED);
     if (jobContext != null) {
@@ -799,7 +803,7 @@ public abstract class AbstractTaskDispatcher {
    *         returns false otherwise.
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
-      Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
+      Map<String, JobConfig> jobConfigMap, TaskDataCache clusterDataCache) {
     boolean incomplete = false;
 
     TaskState workflowState = ctx.getWorkflowState();
@@ -963,4 +967,121 @@ public abstract class AbstractTaskDispatcher {
     // This is a targeted task
     return pName(jobCfg.getJobId(), partitionNum);
   }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState != null
+          && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected ResourceAssignment buildEmptyAssignment(String name,
+      CurrentStateOutput currStateOutput) {
+    ResourceAssignment assignment = new ResourceAssignment(name);
+    Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
+    for (Partition partition : partitions) {
+      Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
+      Map<String, String> replicaMap = Maps.newHashMap();
+      for (String instanceName : currentStateMap.keySet()) {
+        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
+      }
+      assignment.addReplicaMap(partition, replicaMap);
+    }
+    return assignment;
+  }
+
+  /**
+   * Check all the dependencies of a job to determine whether the job is ready to be scheduled.
+   * @param job
+   * @param workflowCfg
+   * @param workflowCtx
+   * @return
+   */
+  protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap,
+      TaskDataCache clusterDataCache) {
+    int notStartedCount = 0;
+    int failedOrTimeoutCount = 0;
+    int incompleteParentCount = 0;
+
+    for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
+      TaskState jobState = workflowCtx.getJobState(parent);
+      if (jobState == null || jobState == TaskState.NOT_STARTED) {
+        ++notStartedCount;
+      } else if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
+        ++failedOrTimeoutCount;
+      } else if (jobState != TaskState.COMPLETED) {
+        incompleteParentCount++;
+      }
+    }
+
+    // If there is any parent job not started, this job should not be scheduled
+    if (notStartedCount > 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job,
+            notStartedCount));
+      }
+      return false;
+    }
+
+    // If there is parent job failed, schedule the job only when ignore dependent
+    // job failure enabled
+    JobConfig jobConfig = jobConfigMap.get(job);
+    if (jobConfig == null) {
+      LOG.error(String.format("The job config is missing for job %s", job));
+      return false;
+    }
+    if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
+      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job,
+            failedOrTimeoutCount));
+      }
+      return false;
+    }
+
+    if (workflowCfg.isJobQueue()) {
+      // If job comes from a JobQueue, it should apply the parallel job logics
+      if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job,
+              incompleteAllCount));
+        }
+        return false;
+      }
+    } else {
+      // If this job comes from a generic workflow, job will not be scheduled until
+      // all the direct parent jobs finished
+      if (incompleteParentCount > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job,
+              incompleteParentCount));
+        }
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Check if a workflow is ready to schedule.
+   * @param workflowCfg the workflow to check
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
+    Date startTime = workflowCfg.getStartTime();
+    // Workflow with non-scheduled config or passed start time is ready to schedule.
+    return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index abc260a..143053e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -112,7 +112,7 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        clusterData.getJobConfigMap(), clusterData)) {
+        clusterData.getJobConfigMap(), clusterData.getTaskDataCache())) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2f88e24..e75fa82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -54,127 +54,7 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
   @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput);
 
-  /**
-   * Checks if the workflow has been stopped.
-   *
-   * @param ctx Workflow context containing task states
-   * @param cfg Workflow config containing set of tasks
-   *
-   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
-   */
-  protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      TaskState jobState = ctx.getJobState(job);
-      if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  protected ResourceAssignment buildEmptyAssignment(String name,
-      CurrentStateOutput currStateOutput) {
-    ResourceAssignment assignment = new ResourceAssignment(name);
-    Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
-    for (Partition partition : partitions) {
-      Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
-      Map<String, String> replicaMap = Maps.newHashMap();
-      for (String instanceName : currentStateMap.keySet()) {
-        replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
-      }
-      assignment.addReplicaMap(partition, replicaMap);
-    }
-    return assignment;
-  }
-
-  /**
-   * Check all the dependencies of a job to determine whether the job is ready to be scheduled.
-   *
-   * @param job
-   * @param workflowCfg
-   * @param workflowCtx
-   *
-   * @return
-   */
-  protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
-    int notStartedCount = 0;
-    int failedOrTimeoutCount = 0;
-    int incompleteParentCount = 0;
-
-    for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
-      TaskState jobState = workflowCtx.getJobState(parent);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
-        ++failedOrTimeoutCount;
-      } else if (jobState != TaskState.COMPLETED) {
-        incompleteParentCount++;
-      }
-    }
-
-    // If there is any parent job not started, this job should not be scheduled
-    if (notStartedCount > 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount));
-      }
-      return false;
-    }
 
-    // If there is parent job failed, schedule the job only when ignore dependent
-    // job failure enabled
-    JobConfig jobConfig = jobConfigMap.get(job);
-    if (jobConfig == null) {
-      LOG.error(String.format("The job config is missing for job %s", job));
-      return false;
-    }
-    if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
-      markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount));
-      }
-      return false;
-    }
-
-    if (workflowCfg.isJobQueue()) {
-      // If job comes from a JobQueue, it should apply the parallel job logics
-      if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job,
-              incompleteAllCount));
-        }
-        return false;
-      }
-    } else {
-      // If this job comes from a generic workflow, job will not be scheduled until
-      // all the direct parent jobs finished
-      if (incompleteParentCount > 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.", job,
-              incompleteParentCount));
-        }
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule.
-   *
-   * @param workflowCfg the workflow to check
-   *
-   * @return true if the workflow is ready for schedule, false if not ready
-   */
-  protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
-    Date startTime = workflowCfg.getStartTime();
-    // Workflow with non-scheduled config or passed start time is ready to schedule.
-    return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
-  }
 
   @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
new file mode 100644
index 0000000..db85a14
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -0,0 +1,526 @@
+package org.apache.helix.task;
+
+import com.google.common.collect.Lists;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkflowDispatcher extends AbstractTaskDispatcher {
+  private static final Logger LOG = LoggerFactory.getLogger(WorkflowDispatcher.class);
+  private static final Set<TaskState> finalStates = new HashSet<>(
+      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT));
+  private TaskDataCache _taskDataCache;
+
+  public void updateCache(TaskDataCache cache) {
+    _taskDataCache = cache;
+  }
+
+  // Split it into status update and assign. But there are couple of data need
+  // to pass around.
+  public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+
+    // Fetch workflow configuration and context
+    if (workflowCfg == null) {
+      LOG.warn("Workflow configuration is NULL for " + workflow);
+      return;
+    }
+
+    // Step 1: Check for deletion - if so, we don't need to go through further steps
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+      cleanupWorkflow(workflow);
+      return;
+    }
+
+    // Step 2: handle timeout, which should have higher priority than STOP
+    // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
+    // the workflow already got timeouted. Job Queue will ignore the setup.
+    if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
+      // If timeout point has already been passed, it will not be scheduled
+      scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
+
+      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+        workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
+        _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+      }
+
+      // We should not return after setting timeout, as in case the workflow is stopped already
+      // marking it timeout will not trigger rebalance pipeline as we are not listening on
+      // PropertyStore change, nor will we schedule rebalance for timeout as at this point,
+      // workflow is already timed-out. We should let the code proceed and wait for schedule
+      // future cleanup work
+    }
+
+    // Step 3: handle workflow that should STOP
+    // For workflows that already reached final states, STOP should not take into effect
+    if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP.equals(targetState)) {
+      LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+      }
+      return;
+    }
+
+    long currentTime = System.currentTimeMillis();
+
+    // Step 4: Check and process finished workflow context (confusing,
+    // but its inside isWorkflowFinished())
+    // Check if workflow has been finished and mark it if it is. Also update cluster status
+    // monitor if provided
+    // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
+    // This is to handle TIMED_OUT only
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
+        workflowCfg, _taskDataCache.getJobConfigMap(), _taskDataCache)) {
+      workflowCtx.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowCtx, workflowCfg);
+      _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+    }
+
+    // Step 5: Handle finished workflows
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+      LOG.info("Workflow " + workflow + " is finished.");
+      long expiryTime = workflowCfg.getExpiry();
+      // Check if this workflow has been finished past its expiry.
+      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
+        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
+        cleanupWorkflow(workflow);
+      } else {
+        // schedule future cleanup work
+        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
+        _rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
+      }
+      return;
+    }
+
+    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+      Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
+      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
+      if (jobWithFinalStates.size() > 0) {
+        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
+        workflowCtx.removeJobStates(jobWithFinalStates);
+        workflowCtx.removeJobStartTime(jobWithFinalStates);
+      }
+    }
+
+    _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+  }
+
+  public void assignWorkflow(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
+    // Fetch workflow configuration and context
+    if (workflowCfg == null) {
+      // Already logged in status update.
+      return;
+    }
+
+    if (!isWorkflowReadyForSchedule(workflowCfg)) {
+      LOG.info("Workflow " + workflow + " is not ready to schedule");
+      // set the timer to trigger future schedule
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
+          workflowCfg.getStartTime().getTime());
+      return;
+    }
+
+
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx, _taskDataCache);
+    if (isReady) {
+      // Schedule jobs from this workflow.
+      scheduleJobs(workflow, workflowCfg, workflowCtx, _taskDataCache.getJobConfigMap(), _taskDataCache);
+    } else {
+      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+    }
+    _taskDataCache.updateWorkflowContext(workflow, workflowCtx);
+  }
+
+  public WorkflowContext getOrInitializeWorkflowContext(
+      String workflowName, TaskDataCache cache) {
+    WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+      workflowCtx.setName(workflowName);
+      LOG.debug("Workflow context is created for " + workflowName);
+    }
+    return workflowCtx;
+  }
+
+  /**
+   * Figure out whether the jobs in the workflow should be run,
+   * and if it's ready, then just schedule it
+   */
+  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
+      TaskDataCache clusterDataCache) {
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    if (scheduleConfig != null && scheduleConfig.isRecurring()) {
+      LOG.debug("Jobs from recurring workflow are not schedule-able");
+      return;
+    }
+
+    int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx);
+    int scheduledJobs = 0;
+    long timeToSchedule = Long.MAX_VALUE;
+    for (String job : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = workflowCtx.getJobState(job);
+      if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Job " + job + " is already started or completed.");
+        }
+        continue;
+      }
+
+      if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Workflow %s already have enough job in progress, "
+              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+        }
+        break;
+      }
+
+      // check ancestor job status
+      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap,
+          clusterDataCache)) {
+        JobConfig jobConfig = jobConfigMap.get(job);
+        if (jobConfig == null) {
+          LOG.error(String.format("The job config is missing for job %s", job));
+          continue;
+        }
+
+        // Since the start time is calculated base on the time of completion of parent jobs for this
+        // job, the calculated start time should only be calculate once. Persist the calculated time
+        // in WorkflowContext znode.
+        long calculatedStartTime = workflowCtx.getJobStartTime(job);
+        if (calculatedStartTime < 0) {
+          // Calculate the start time if it is not already calculated
+          calculatedStartTime = System.currentTimeMillis();
+          // If the start time is not calculated before, do the math.
+          if (jobConfig.getExecutionDelay() >= 0) {
+            calculatedStartTime += jobConfig.getExecutionDelay();
+          }
+          calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
+          workflowCtx.setJobStartTime(job, calculatedStartTime);
+        }
+
+        // Time is not ready. Set a trigger and update the start time.
+        if (System.currentTimeMillis() < calculatedStartTime) {
+          timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
+        } else {
+          scheduleSingleJob(job, jobConfig);
+          workflowCtx.setJobState(job, TaskState.NOT_STARTED);
+          scheduledJobs++;
+        }
+      }
+    }
+    long currentScheduledTime =
+        _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
+            : _rebalanceScheduler.getRebalanceTime(workflow);
+    if (timeToSchedule < currentScheduledTime) {
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow, timeToSchedule);
+    }
+  }
+
+  /**
+   * Posts new job to cluster
+   */
+  private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
+    HelixAdmin admin = _manager.getClusterManagmentTool();
+
+    IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
+    if (jobIS != null) {
+      LOG.info("Job " + jobResource + " idealstate already exists!");
+      return;
+    }
+
+    // Set up job resource based on partitions from target resource
+    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
+        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
+
+    int numPartitions = numIndependentTasks;
+    if (numPartitions == 0) {
+      IdealState targetIs =
+          admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
+      if (targetIs == null) {
+        LOG.warn("Target resource does not exist for job " + jobResource);
+        // do not need to fail here, the job will be marked as failure immediately when job starts
+        // running.
+      } else {
+        numPartitions = targetIs.getPartitionSet().size();
+      }
+    }
+
+    admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
+        TaskConstants.STATE_MODEL_NAME);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Set the job configuration
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
+    // Push out new ideal state based on number of target partitions
+    IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
+    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
+    builder.setNumReplica(1);
+    builder.setNumPartitions(numPartitions);
+    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+
+    if (jobConfig.getInstanceGroupTag() != null) {
+      builder.setNodeGroup(jobConfig.getInstanceGroupTag());
+    }
+
+    if (jobConfig.isDisableExternalView()) {
+      builder.disableExternalView();
+    }
+
+    jobIS = builder.build();
+    for (int i = 0; i < numPartitions; i++) {
+      jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
+      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
+    }
+    jobIS.setRebalancerClassName(JobRebalancer.class.getName());
+    admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
+  }
+
+  /**
+   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+   * @param workflow the Helix resource associated with the workflow
+   * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx, TaskDataCache cache) {
+    // non-scheduled workflow is ready to run immediately.
+    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+      return true;
+    }
+
+    // Figure out when this should be run, and if it's ready, then just run it
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    Date startTime = scheduleConfig.getStartTime();
+    long currentTime = new Date().getTime();
+    long delayFromStart = startTime.getTime() - currentTime;
+
+    if (delayFromStart <= 0) {
+      // Recurring workflows are just templates that spawn new workflows
+      if (scheduleConfig.isRecurring()) {
+        // Skip scheduling this workflow if it's not in a start state
+        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
+          }
+          return false;
+        }
+
+        // Skip scheduling this workflow again if the previous run (if any) is still active
+        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+        if (lastScheduled != null) {
+          WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+            return false;
+          }
+        }
+
+        // Figure out how many jumps are needed, thus the time to schedule the next workflow
+        // The negative of the delay is the amount of time past the start time
+        long period =
+            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+        long offsetMultiplier = (-delayFromStart) / period;
+        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+        // Now clone the workflow if this clone has not yet been created
+        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ready to start workflow " + newWorkflowName);
+        }
+        if (lastScheduled == null || !newWorkflowName.equals(lastScheduled)) {
+          Workflow clonedWf =
+              cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
+          TaskDriver driver = new TaskDriver(_manager);
+          if (clonedWf != null) {
+            try {
+              // Start the cloned workflow
+              driver.start(clonedWf);
+            } catch (Exception e) {
+              LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
+            }
+          }
+          // Persist workflow start regardless of success to avoid retrying and failing
+          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+        }
+
+        // Change the time to trigger the pipeline to that of the next run
+        _rebalanceScheduler.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
+      } else {
+        // one time workflow.
+        // Remove any timers that are past-time for this workflowg
+        long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
+        if (scheduledTime > 0 && currentTime > scheduledTime) {
+          _rebalanceScheduler.removeScheduledRebalance(workflow);
+        }
+        return true;
+      }
+    } else {
+      // set the timer to trigger future schedule
+      _rebalanceScheduler.scheduleRebalance(_manager, workflow, startTime.getTime());
+    }
+
+    return false;
+  }
+
+  /**
+   * Create a new workflow based on an existing one
+   * @param manager connection to Helix
+   * @param origWorkflowName the name of the existing workflow
+   * @param newWorkflowName the name of the new workflow
+   * @param newStartTime a provided start time that deviates from the desired start time
+   * @return the cloned workflow, or null if there was a problem cloning the existing one
+   */
+  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
+      String newWorkflowName, Date newStartTime) {
+    // Read all resources, including the workflow and jobs of interest
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    if (!resourceConfigMap.containsKey(origWorkflowName)) {
+      LOG.error("No such workflow named " + origWorkflowName);
+      return null;
+    }
+    if (resourceConfigMap.containsKey(newWorkflowName)) {
+      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      return null;
+    }
+
+    // Create a new workflow with a new name
+    Map<String, String> workflowConfigsMap =
+        resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
+    WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
+
+    // Set the schedule, if applicable
+    if (newStartTime != null) {
+      ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+      workflowConfigBlder.setScheduleConfig(scheduleConfig);
+    }
+    workflowConfigBlder.setTerminable(true);
+
+    WorkflowConfig workflowConfig = workflowConfigBlder.build();
+
+    JobDag jobDag = workflowConfig.getJobDag();
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
+    workflowBuilder.setWorkflowConfig(workflowConfig);
+
+    // Add each job back as long as the original exists
+    Set<String> namespacedJobs = jobDag.getAllNodes();
+    for (String namespacedJob : namespacedJobs) {
+      if (resourceConfigMap.containsKey(namespacedJob)) {
+        // Copy over job-level and task-level configs
+        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
+        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
+
+        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
+        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
+        List<TaskConfig> taskConfigs = Lists.newLinkedList();
+        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
+          taskConfigs.add(taskConfig);
+        }
+        jobCfgBuilder.addTaskConfigs(taskConfigs);
+        workflowBuilder.addJob(job, jobCfgBuilder);
+
+        // Add dag dependencies
+        Set<String> children = parentsToChildren.get(namespacedJob);
+        if (children != null) {
+          for (String namespacedChild : children) {
+            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
+            workflowBuilder.addParentChildDependency(job, child);
+          }
+        }
+      }
+    }
+    return workflowBuilder.build();
+  }
+
+  /**
+   * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
+   * contexts associated with this workflow, and all jobs information, including their configs,
+   * context, IS and EV.
+   */
+  private void cleanupWorkflow(String workflow) {
+    LOG.info("Cleaning up workflow: " + workflow);
+    WorkflowConfig workflowcfg = _taskDataCache.getWorkflowConfig(workflow);
+
+    if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
+      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+      // Remove all pending timer tasks for this workflow if exists
+      _rebalanceScheduler.removeScheduledRebalance(workflow);
+      for (String job : jobs) {
+        _rebalanceScheduler.removeScheduledRebalance(job);
+      }
+      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
+          _manager.getHelixPropertyStore(), workflow, jobs)) {
+        LOG.warn("Failed to clean up workflow " + workflow);
+      } else {
+        // Only remove from cache when remove all workflow success. Otherwise, batch write will
+        // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
+        // and jobs will rescheduled again.
+        removeContexts(workflow, jobs, _taskDataCache);
+      }
+     } else {
+      LOG.info("Did not clean up workflow " + workflow
+          + " because neither the workflow is non-terminable nor is set to DELETE.");
+    }
+  }
+
+  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache cache) {
+    if (jobs != null) {
+      for (String job : jobs) {
+        cache.removeContext(job);
+      }
+    }
+    cache.removeContext(workflow);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6851475..c913131 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,531 +19,43 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 /**
  * Custom rebalancer implementation for the {@code Workflow} in task state model.
  */
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = LoggerFactory.getLogger(WorkflowRebalancer.class);
-  private static final Set<TaskState> finalStates = new HashSet<>(
-      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT));
+  private WorkflowDispatcher _workflowDispatcher = new WorkflowDispatcher();
 
   @Override
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
     final String workflow = resource.getResourceName();
+    long startTime = System.currentTimeMillis();
     LOG.debug("Computer Best Partition for workflow: " + workflow);
-
-    // Fetch workflow configuration and context
+    _workflowDispatcher.init(_manager);
+    WorkflowContext workflowCtx = _workflowDispatcher
+        .getOrInitializeWorkflowContext(workflow, clusterData.getTaskDataCache());
     WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflow);
-    if (workflowCfg == null) {
-      LOG.warn("Workflow configuration is NULL for " + workflow);
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    WorkflowContext workflowCtx = getOrInitializeWorkflowContext(clusterData, workflow);
-
-    // Step 1: Check for deletion - if so, we don't need to go through further steps
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
-      cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    // Step 2: handle timeout, which should have higher priority than STOP
-    // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
-    // the workflow already got timeouted. Job Queue will ignore the setup.
-    if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
-      // If timeout point has already been passed, it will not be scheduled
-      scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
-
-      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
-          && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
-        workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
-        clusterData.updateWorkflowContext(workflow, workflowCtx);
-      }
-
-      // We should not return after setting timeout, as in case the workflow is stopped already
-      // marking it timeout will not trigger rebalance pipeline as we are not listening on
-      // PropertyStore change, nor will we schedule rebalance for timeout as at this point,
-      // workflow is already timed-out. We should let the code proceed and wait for schedule
-      // future cleanup work
-    }
 
-    // Step 3: handle workflow that should STOP
-    // For workflows that already reached final states, STOP should not take into effect
-    if (!finalStates.contains(workflowCtx.getWorkflowState())
-        && TargetState.STOP.equals(targetState)) {
-      LOG.info("Workflow " + workflow + "is marked as stopped.");
-      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-        clusterData.updateWorkflowContext(workflow, workflowCtx);
-      }
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
+    _workflowDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
+    _workflowDispatcher.updateCache(clusterData.getTaskDataCache());
+    _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, workflowCtx);
+    _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx);
 
-    long currentTime = System.currentTimeMillis();
-
-    // Step 4: Check and process finished workflow context (confusing,
-    // but its inside isWorkflowFinished())
-    // Check if workflow has been finished and mark it if it is. Also update cluster status
-    // monitor if provided
-    // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
-    // This is to handle TIMED_OUT only
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
-        workflowCfg, clusterData.getJobConfigMap(), clusterData)) {
-      workflowCtx.setFinishTime(currentTime);
-      updateWorkflowMonitor(workflowCtx, workflowCfg);
-      clusterData.updateWorkflowContext(workflow, workflowCtx);
-    }
-
-    // Step 5: Handle finished workflows
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow " + workflow + " is finished.");
-      long expiryTime = workflowCfg.getExpiry();
-      // Check if this workflow has been finished past its expiry.
-      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
-        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
-        cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
-      } else {
-        // schedule future cleanup work
-        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
-        _rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
-      }
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Workflow " + workflow + " is not ready to schedule");
-      // set the timer to trigger future schedule
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow,
-          workflowCfg.getStartTime().getTime());
-      return buildEmptyAssignment(workflow, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx, clusterData);
-    if (isReady) {
-      // Schedule jobs from this workflow.
-      scheduleJobs(workflow, workflowCfg, workflowCtx, clusterData.getJobConfigMap(), clusterData);
-    } else {
-      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
-    }
-
-    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
-      Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
-      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
-      if (jobWithFinalStates.size() > 0) {
-        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
-        workflowCtx.removeJobStates(jobWithFinalStates);
-        workflowCtx.removeJobStartTime(jobWithFinalStates);
-      }
-    }
-
-    clusterData.updateWorkflowContext(workflow, workflowCtx);
+    LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for workflow %s",
+        System.currentTimeMillis() - startTime, workflow));
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
-  private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterData,
-      String workflowName) {
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName);
-    if (workflowCtx == null) {
-      WorkflowConfig config = clusterData.getWorkflowConfig(workflowName);
-      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflowName);
-      LOG.debug("Workflow context is created for " + workflowName);
-    }
-    return workflowCtx;
-  }
-
-  /**
-   * Figure out whether the jobs in the workflow should be run,
-   * and if it's ready, then just schedule it
-   */
-  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    if (scheduleConfig != null && scheduleConfig.isRecurring()) {
-      LOG.debug("Jobs from recurring workflow are not schedule-able");
-      return;
-    }
-
-    int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx);
-    int scheduledJobs = 0;
-    long timeToSchedule = Long.MAX_VALUE;
-    for (String job : workflowCfg.getJobDag().getAllNodes()) {
-      TaskState jobState = workflowCtx.getJobState(job);
-      if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Job " + job + " is already started or completed.");
-        }
-        continue;
-      }
-
-      if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Workflow %s already have enough job in progress, "
-              + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
-        }
-        break;
-      }
-
-      // check ancestor job status
-      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap,
-          clusterDataCache)) {
-        JobConfig jobConfig = jobConfigMap.get(job);
-        if (jobConfig == null) {
-          LOG.error(String.format("The job config is missing for job %s", job));
-          continue;
-        }
-
-        // Since the start time is calculated base on the time of completion of parent jobs for this
-        // job, the calculated start time should only be calculate once. Persist the calculated time
-        // in WorkflowContext znode.
-        long calculatedStartTime = workflowCtx.getJobStartTime(job);
-        if (calculatedStartTime < 0) {
-          // Calculate the start time if it is not already calculated
-          calculatedStartTime = System.currentTimeMillis();
-          // If the start time is not calculated before, do the math.
-          if (jobConfig.getExecutionDelay() >= 0) {
-            calculatedStartTime += jobConfig.getExecutionDelay();
-          }
-          calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
-          workflowCtx.setJobStartTime(job, calculatedStartTime);
-        }
-
-        // Time is not ready. Set a trigger and update the start time.
-        if (System.currentTimeMillis() < calculatedStartTime) {
-          timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
-        } else {
-          scheduleSingleJob(job, jobConfig);
-          workflowCtx.setJobState(job, TaskState.NOT_STARTED);
-          scheduledJobs++;
-        }
-      }
-    }
-    long currentScheduledTime =
-        _rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
-            : _rebalanceScheduler.getRebalanceTime(workflow);
-    if (timeToSchedule < currentScheduledTime) {
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, timeToSchedule);
-    }
-  }
-
-  /**
-   * Posts new job to cluster
-   */
-  private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
-    HelixAdmin admin = _manager.getClusterManagmentTool();
-
-    IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
-    if (jobIS != null) {
-      LOG.info("Job " + jobResource + " idealstate already exists!");
-      return;
-    }
-
-    // Set up job resource based on partitions from target resource
-    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
-        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
-    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
-
-    int numPartitions = numIndependentTasks;
-    if (numPartitions == 0) {
-      IdealState targetIs =
-          admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
-      if (targetIs == null) {
-        LOG.warn("Target resource does not exist for job " + jobResource);
-        // do not need to fail here, the job will be marked as failure immediately when job starts
-        // running.
-      } else {
-        numPartitions = targetIs.getPartitionSet().size();
-      }
-    }
-
-    admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
-        TaskConstants.STATE_MODEL_NAME);
-
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Set the job configuration
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    HelixProperty resourceConfig = new HelixProperty(jobResource);
-    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null) {
-      for (TaskConfig taskConfig : taskConfigMap.values()) {
-        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
-      }
-    }
-    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
-    // Push out new ideal state based on number of target partitions
-    IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
-    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
-    builder.setNumReplica(1);
-    builder.setNumPartitions(numPartitions);
-    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
-
-    if (jobConfig.getInstanceGroupTag() != null) {
-      builder.setNodeGroup(jobConfig.getInstanceGroupTag());
-    }
-
-    if (jobConfig.isDisableExternalView()) {
-      builder.disableExternalView();
-    }
-
-    jobIS = builder.build();
-    for (int i = 0; i < numPartitions; i++) {
-      jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
-      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
-    }
-    jobIS.setRebalancerClassName(JobRebalancer.class.getName());
-    admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
-   * @param workflow the Helix resource associated with the workflow
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @return true if the workflow is ready for schedule, false if not ready
-   */
-  private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, ClusterDataCache cache) {
-    // non-scheduled workflow is ready to run immediately.
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
-          }
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ready to start workflow " + newWorkflowName);
-        }
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          if (clonedWf != null) {
-            try {
-              // Start the cloned workflow
-              driver.start(clonedWf);
-            } catch (Exception e) {
-              LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
-            }
-          }
-          // Persist workflow start regardless of success to avoid retrying and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        _rebalanceScheduler.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
-      } else {
-        // one time workflow.
-        // Remove any timers that are past-time for this workflowg
-        long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
-        if (scheduledTime > 0 && currentTime > scheduledTime) {
-          _rebalanceScheduler.removeScheduledRebalance(workflow);
-        }
-        return true;
-      }
-    } else {
-      // set the timer to trigger future schedule
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, startTime.getTime());
-    }
-
-    return false;
-  }
-
-  /**
-   * Create a new workflow based on an existing one
-   * @param manager connection to Helix
-   * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired start time
-   * @return the cloned workflow, or null if there was a problem cloning the existing one
-   */
-  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
-      String newWorkflowName, Date newStartTime) {
-    // Read all resources, including the workflow and jobs of interest
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Map<String, HelixProperty> resourceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
-    if (!resourceConfigMap.containsKey(origWorkflowName)) {
-      LOG.error("No such workflow named " + origWorkflowName);
-      return null;
-    }
-    if (resourceConfigMap.containsKey(newWorkflowName)) {
-      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
-      return null;
-    }
 
-    // Create a new workflow with a new name
-    Map<String, String> workflowConfigsMap =
-        resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
-    WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
-
-    // Set the schedule, if applicable
-    if (newStartTime != null) {
-      ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
-      workflowConfigBlder.setScheduleConfig(scheduleConfig);
-    }
-    workflowConfigBlder.setTerminable(true);
-
-    WorkflowConfig workflowConfig = workflowConfigBlder.build();
-
-    JobDag jobDag = workflowConfig.getJobDag();
-    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
-
-    Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
-    workflowBuilder.setWorkflowConfig(workflowConfig);
-
-    // Add each job back as long as the original exists
-    Set<String> namespacedJobs = jobDag.getAllNodes();
-    for (String namespacedJob : namespacedJobs) {
-      if (resourceConfigMap.containsKey(namespacedJob)) {
-        // Copy over job-level and task-level configs
-        String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
-        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
-        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
-
-        JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
-
-        jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
-        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
-        List<TaskConfig> taskConfigs = Lists.newLinkedList();
-        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
-          TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
-          taskConfigs.add(taskConfig);
-        }
-        jobCfgBuilder.addTaskConfigs(taskConfigs);
-        workflowBuilder.addJob(job, jobCfgBuilder);
-
-        // Add dag dependencies
-        Set<String> children = parentsToChildren.get(namespacedJob);
-        if (children != null) {
-          for (String namespacedChild : children) {
-            String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
-            workflowBuilder.addParentChildDependency(job, child);
-          }
-        }
-      }
-    }
-    return workflowBuilder.build();
-  }
-
-  /**
-   * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
-   * contexts associated with this workflow, and all jobs information, including their configs,
-   * context, IS and EV.
-   */
-  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
-      TaskDataCache taskDataCache) {
-    LOG.info("Cleaning up workflow: " + workflow);
-
-    if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
-      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
-      // Remove all pending timer tasks for this workflow if exists
-      _rebalanceScheduler.removeScheduledRebalance(workflow);
-      for (String job : jobs) {
-        _rebalanceScheduler.removeScheduledRebalance(job);
-      }
-      if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
-          _manager.getHelixPropertyStore(), workflow, jobs)) {
-        LOG.warn("Failed to clean up workflow " + workflow);
-      } else {
-        // Only remove from cache when remove all workflow success. Otherwise, batch write will
-        // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
-        // and jobs will rescheduled again.
-        removeContexts(workflow, jobs, taskDataCache);
-      }
-    } else {
-      LOG.info("Did not clean up workflow " + workflow
-          + " because neither the workflow is non-terminable nor is set to DELETE.");
-    }
-  }
-
-  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache cache) {
-    if (jobs != null) {
-      for (String job : jobs) {
-        cache.removeContext(job);
-      }
-    }
-    cache.removeContext(workflow);
-  }
 
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/main/scripts/integration-test/script/pexpect.py
----------------------------------------------------------------------
diff --git a/helix-core/src/main/scripts/integration-test/script/pexpect.py b/helix-core/src/main/scripts/integration-test/script/pexpect.py
index 6516cda..66c2d3c 100644
--- a/helix-core/src/main/scripts/integration-test/script/pexpect.py
+++ b/helix-core/src/main/scripts/integration-test/script/pexpect.py
@@ -355,12 +355,12 @@ class spawn (object):
         the input from the child and output sent to the child. Sometimes you
         don't want to see everything you write to the child. You only want to
         log what the child sends back. For example::
-        
+
             child = pexpect.spawn('some_command')
             child.logfile_read = sys.stdout
 
         To separately log output sent to the child use logfile_send::
-        
+
             self.logfile_send = fout
 
         The delaybeforesend helps overcome a weird behavior that many users
@@ -723,7 +723,7 @@ class spawn (object):
         if timeout == -1:
             timeout = self.timeout
         if timeout is not None:
-            end_time = time.time() + timeout 
+            end_time = time.time() + timeout
         while True:
             if not self.getecho():
                 return True
@@ -1374,7 +1374,7 @@ class spawn (object):
         if timeout == -1:
             timeout = self.timeout
         if timeout is not None:
-            end_time = time.time() + timeout 
+            end_time = time.time() + timeout
         if searchwindowsize == -1:
             searchwindowsize = self.searchwindowsize
 
@@ -1672,7 +1672,7 @@ class searcher_string (object):
         # rescanning until we've read three more bytes.
         #
         # Sadly, I don't know enough about this interesting topic. /grahn
-        
+
         for index, s in self._strings:
             if searchwindowsize is None:
                 # the match, if any, can only be in the fresh data,
@@ -1751,7 +1751,7 @@ class searcher_re (object):
         'buffer' which have not been searched before.
 
         See class spawn for the 'searchwindowsize' argument.
-        
+
         If there is a match this returns the index of that string, and sets
         'start', 'end' and 'match'. Otherwise, returns -1."""
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 431b929..7495078 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -240,7 +240,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     // Check that the workflow only started after the start time (with a 1 second buffer)
     WorkflowContext workflowCtx = _driver.getWorkflowContext(jobName);
     long startTime = workflowCtx.getStartTime();
-    Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
+    Assert.assertTrue(startTime <= inFiveSeconds);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/0c3ac37b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 361a672..5eba70a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -50,7 +50,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
     List<String> currentJobNames = createAndEnqueueJob(queueBuild, 2);
-
+    queueBuild.setExpiry(1);
     _driver.start(queueBuild.build());
 
     WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
@@ -61,20 +61,21 @@ public class TestRecurringJobQueue extends TaskTestBase {
     _driver.pollForJobState(scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
-    _driver.delete(queueName);
-    Thread.sleep(500);
+    _driver.deleteAndWaitForCompletion(queueName, 5000);
 
     JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     currentJobNames = createAndEnqueueJob(queueBuilder, 2);
 
-    _driver.createQueue(queueBuilder.build());
-
-
-    wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+    _driver.start(queueBuilder.build());
 
     // ensure jobs are started and completed
-    scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    scheduledQueue = null;
+    while (scheduledQueue == null) {
+      wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    }
+
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
     _driver.pollForJobState(scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
 
@@ -97,8 +98,13 @@ public class TestRecurringJobQueue extends TaskTestBase {
     List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5);
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
-    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    WorkflowContext wCtx = null;
+    String scheduledQueue = null;
+
+    while (scheduledQueue == null) {
+      wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    }
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);


[3/4] helix git commit: Using HelixZkClient to replace ZkClient in helix-core and helix-rest.

Posted by jx...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
index 7506e9b..cc8eef5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
@@ -32,7 +33,8 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.store.PropertyJsonSerializer;
@@ -69,7 +71,7 @@ public class TestDriver {
       new ConcurrentHashMap<String, TestInfo>();
 
   public static class TestInfo {
-    public final ZkClient _zkClient;
+    public final HelixZkClient _zkClient;
     public final String _clusterName;
     public final int _numDb;
     public final int _numPartitionsPerDb;
@@ -79,7 +81,7 @@ public class TestDriver {
     public final Map<String, HelixManager> _managers =
         new ConcurrentHashMap<String, HelixManager>();
 
-    public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
+    public TestInfo(String clusterName, HelixZkClient zkClient, int numDb, int numPartitionsPerDb,
         int numNode, int replica) {
       this._clusterName = clusterName;
       this._zkClient = zkClient;
@@ -116,47 +118,51 @@ public class TestDriver {
   public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
       int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
       throws Exception {
-    ZkClient zkClient = new ZkClient(zkAddr);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-
-    // String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
-    String clusterName = uniqClusterName;
-    if (zkClient.exists("/" + clusterName)) {
-      LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName
-          + " is not unique or test has been run without cleaning up zk; deleting it");
-      zkClient.deleteRecursively("/" + clusterName);
-    }
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
 
-    if (_testInfoMap.containsKey(uniqClusterName)) {
-      LOG.warn("test info already exists:" + uniqClusterName
-          + " is not unique or test has been run without cleaning up test info map; removing it");
-      _testInfoMap.remove(uniqClusterName);
-    }
-    TestInfo testInfo =
-        new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource, numInstances,
-            replica);
-    _testInfoMap.put(uniqClusterName, testInfo);
+    try {
+      zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    ClusterSetup setupTool = new ClusterSetup(zkAddr);
-    setupTool.addCluster(clusterName, true);
+      // String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
+      String clusterName = uniqClusterName;
+      if (zkClient.exists("/" + clusterName)) {
+        LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName + " is not unique or test has been run without cleaning up zk; deleting it");
+        zkClient.deleteRecursively("/" + clusterName);
+      }
 
-    for (int i = 0; i < numInstances; i++) {
-      int port = START_PORT + i;
-      setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + "_" + port);
-    }
+      if (_testInfoMap.containsKey(uniqClusterName)) {
+        LOG.warn("test info already exists:" + uniqClusterName + " is not unique or test has been run without cleaning up test info map; removing it");
+        _testInfoMap.remove(uniqClusterName);
+      }
+      TestInfo testInfo =
+          new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource, numInstances,
+              replica);
+      _testInfoMap.put(uniqClusterName, testInfo);
 
-    for (int i = 0; i < numResources; i++) {
-      String dbName = TEST_DB_PREFIX + i;
-      setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource, STATE_MODEL);
-      if (doRebalance) {
-        setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
-
-        // String idealStatePath = "/" + clusterName + "/" +
-        // PropertyType.IDEALSTATES.toString() + "/"
-        // + dbName;
-        // ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
-        // testInfo._idealStateMap.put(dbName, idealState);
+      ClusterSetup setupTool = new ClusterSetup(zkAddr);
+      setupTool.addCluster(clusterName, true);
+
+      for (int i = 0; i < numInstances; i++) {
+        int port = START_PORT + i;
+        setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + "_" + port);
+      }
+
+      for (int i = 0; i < numResources; i++) {
+        String dbName = TEST_DB_PREFIX + i;
+        setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource, STATE_MODEL);
+        if (doRebalance) {
+          setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+
+          // String idealStatePath = "/" + clusterName + "/" +
+          // PropertyType.IDEALSTATES.toString() + "/"
+          // + dbName;
+          // ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
+          // testInfo._idealStateMap.put(dbName, idealState);
+        }
       }
+    } finally {
+      zkClient.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
index d8069b4..daf24fb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
@@ -3,14 +3,15 @@ package org.apache.helix.integration;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -86,9 +87,11 @@ public class TestEnableCompression extends ZkTestBase {
     idealstate.getRecord().setBooleanField("enableCompression", true);
     _gSetupTool.getClusterManagementTool().addResource(clusterName, resourceName, idealstate);
 
-    ZkClient zkClient =
-        new ZkClient(ZK_ADDR, 60 * 1000, 60 * 1000, new BytesPushThroughSerializer());
-    zkClient.waitUntilConnected(10, TimeUnit.SECONDS);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new BytesPushThroughSerializer())
+        .setOperationRetryTimeout((long) (60 * 1000)).setConnectInitTimeout(60 * 1000);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
 
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
@@ -126,7 +129,7 @@ public class TestEnableCompression extends ZkTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
-  private void findCompressedZNodes(ZkClient zkClient, String path, List<String> compressedPaths) {
+  private void findCompressedZNodes(HelixZkClient zkClient, String path, List<String> compressedPaths) {
     List<String> children = zkClient.getChildren(path);
     if (children != null && children.size() > 0) {
       for (String child : children) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
index 46e87c2..ecdd285 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEntropyFreeNodeBounce.java
@@ -188,7 +188,7 @@ public class TestEntropyFreeNodeBounce extends ZkUnitTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _gZkClient;
+      return (ZkClient) _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
index 7fe3424..7e27801 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -29,7 +29,6 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -77,16 +76,13 @@ public class TestPauseSignal extends ZkTestBase {
     Assert.assertTrue(result);
 
     // pause the cluster and make sure pause is persistent
-    ZkClient zkClient = new ZkClient(ZK_ADDR);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
     final HelixDataAccessor tmpAccessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
 
     String cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " false";
     ClusterSetup.processCommandLineArgs(cmd.split(" "));
 
     tmpAccessor.setProperty(tmpAccessor.keyBuilder().pause(), new PauseSignal("pause"));
-    zkClient.close();
 
     // wait for controller to be signaled by pause
     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index 63d87eb..2e98ae8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -35,7 +35,7 @@ import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.mock.participant.DummyProcess;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.IdealState;
@@ -426,8 +426,8 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
     }
 
     @Override
-    public ZkClient getZkClient() {
-      return (ZkClient) _zkclient;
+    public HelixZkClient getZkClient() {
+      return _zkclient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
index 5a8ee97..b7a6eb9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
@@ -23,12 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.testng.Assert;
@@ -39,10 +36,8 @@ public class TestStatusUpdate extends ZkStandAloneCMTestBase {
   // this test
   // @Test
   public void testParticipantStatusUpdates() throws Exception {
-    ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 0ffe8e0..33c52b1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -35,7 +35,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
@@ -512,7 +512,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     return sb.toString();
   }
 
-  void printZkListeners(ZkClient client) throws Exception {
+  void printZkListeners(HelixZkClient client) throws Exception {
     Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
     Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index 4947301..8473180 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -1,12 +1,13 @@
 package org.apache.helix.integration;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixException;
 import org.apache.helix.SystemPropertyKeys;
@@ -18,7 +19,8 @@ import org.apache.helix.integration.task.TaskTestBase;
 import org.apache.helix.integration.task.TaskTestUtil;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
@@ -40,17 +42,17 @@ public class TestZkConnectionLost extends TaskTestBase {
 
   private String _zkAddr = "localhost:21893";
   ClusterSetup _setupTool;
-  ZkClient _zkClient;
-
+  HelixZkClient _zkClient;
 
   @BeforeClass
   public void beforeClass() throws Exception {
     ZkServer zkServer = TestHelper.startZkServer(_zkAddr);
     _zkServerRef.set(zkServer);
-    _zkClient = new ZkClient(_zkAddr);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr));
     _zkClient.setZkSerializer(new ZNRecordSerializer());
     _setupTool = new ClusterSetup(_zkClient);
-    _participants =  new MockParticipantManager[_numNodes];
+    _participants = new MockParticipantManager[_numNodes];
     _setupTool.addCluster(CLUSTER_NAME, true);
     setupParticipants(_setupTool);
     setupDBs(_setupTool);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 96f4a88..8e1b59c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -25,7 +25,7 @@ import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,8 +88,8 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
   }
 
   @Override
-  public ZkClient getZkClient() {
-    return (ZkClient) _zkclient;
+  public HelixZkClient getZkClient() {
+    return _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index b186a1a..0a49d58 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
 import org.slf4j.Logger;
@@ -82,8 +82,8 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
   }
 
   @Override
-  public ZkClient getZkClient() {
-    return (ZkClient) _zkclient;
+  public HelixZkClient getZkClient() {
+    return _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 362709a..d1677c6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
 import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -127,8 +127,8 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   }
 
   @Override
-  public ZkClient getZkClient() {
-    return (ZkClient) _zkclient;
+  public HelixZkClient getZkClient() {
+    return _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
index c189bad..d1de60a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
@@ -21,10 +21,10 @@ package org.apache.helix.integration.manager;
 
 import java.util.List;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 
 public interface ZkTestManager {
-  ZkClient getZkClient();
+  HelixZkClient getZkClient();
 
   List<CallbackHandler> getHandlers();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
index 7ad8358..6e48278 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -225,10 +226,12 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
     String _clusterName;
     String _resourceName;
+    HelixZkClient _client;
 
-    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
+    public ExternalViewBalancedVerifier(HelixZkClient client, String clusterName, String resourceName) {
       _clusterName = clusterName;
       _resourceName = resourceName;
+      _client = client;
     }
 
     @Override
@@ -274,7 +277,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _gZkClient;
+      return (ZkClient) _client;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
index 2527a66..99fa0ec 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRollbackException;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -33,6 +34,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -190,10 +192,12 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
     String _clusterName;
     String _resourceName;
+    HelixZkClient _client;
 
-    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
+    public ExternalViewBalancedVerifier(HelixZkClient client, String clusterName, String resourceName) {
       _clusterName = clusterName;
       _resourceName = resourceName;
+      _client = client;
     }
 
     @Override
@@ -217,7 +221,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _gZkClient;
+      return (ZkClient) _client;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
index ba6db12..c7c4e19 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
@@ -34,6 +34,7 @@ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateProperty;
@@ -109,11 +110,11 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
-    ZkClient _client;
+    HelixZkClient _client;
     String _clusterName;
     String _resourceName;
 
-    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
+    public ExternalViewBalancedVerifier(HelixZkClient client, String clusterName, String resourceName) {
       _client = client;
       _clusterName = clusterName;
       _resourceName = resourceName;
@@ -154,14 +155,13 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _client;
+      return (ZkClient) _client;
     }
 
     @Override
     public String getClusterName() {
       return _clusterName;
     }
-
   }
 
   static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount,

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
index b966f86..24e8a26 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
@@ -25,6 +25,8 @@ import com.google.common.collect.Sets;
 import java.util.Date;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.commons.codec.binary.Hex;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -37,15 +39,17 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.helix.util.ZKClientPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -332,7 +336,7 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
     private final String _resourceName;
     private final String[] _taggedNodes;
     private final boolean _isEmptyAllowed;
-    private final ZkClient _zkClient;
+    private final HelixZkClient _zkClient;
 
     /**
      * Create a verifier for a specific cluster and resource
@@ -347,7 +351,10 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
       _resourceName = resourceName;
       _taggedNodes = taggedNodes;
       _isEmptyAllowed = isEmptyAllowed;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+
+      _zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
     }
 
     @Override
@@ -477,7 +484,7 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _zkClient;
+      return (ZkClient) _zkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index 634728e..bccb425 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -26,6 +26,8 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -44,8 +46,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
         + new Date(System.currentTimeMillis()));
 
     ZNRecordSerializer serializer = new ZNRecordSerializer();
-    ZkClient zkClient = new ZkClient(ZK_ADDR);
-    zkClient.setZkSerializer(serializer);
+
     String root = className;
     byte[] buf = new byte[1024];
     for (int i = 0; i < 1024; i++) {
@@ -63,10 +64,10 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     }
 
     String path1 = "/" + root + "/test1";
-    zkClient.createPersistent(path1, true);
-    zkClient.writeData(path1, smallRecord);
+    _gZkClient.createPersistent(path1, true);
+    _gZkClient.writeData(path1, smallRecord);
 
-    ZNRecord record = zkClient.readData(path1);
+    ZNRecord record = _gZkClient.readData(path1);
     Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
 
     // oversized data doesn't create any data on zk
@@ -77,36 +78,36 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       largeRecord.setSimpleField(i + "", bufStr);
     }
     String path2 = "/" + root + "/test2";
-    zkClient.createPersistent(path2, true);
+    _gZkClient.createPersistent(path2, true);
     try {
-      zkClient.writeData(path2, largeRecord);
+      _gZkClient.writeData(path2, largeRecord);
     } catch (HelixException e) {
       Assert.fail("Should not fail because data size is larger than 1M since compression applied");
     }
-    record = zkClient.readData(path2);
+    record = _gZkClient.readData(path2);
     Assert.assertNotNull(record);
 
     // oversized write doesn't overwrite existing data on zk
-    record = zkClient.readData(path1);
+    record = _gZkClient.readData(path1);
     try {
-      zkClient.writeData(path1, largeRecord);
+      _gZkClient.writeData(path1, largeRecord);
     } catch (HelixException e) {
       Assert.fail("Should not fail because data size is larger than 1M since compression applied");
     }
-    ZNRecord recordNew = zkClient.readData(path1);
+    ZNRecord recordNew = _gZkClient.readData(path1);
     byte[] arr = serializer.serialize(record);
     byte[] arrNew = serializer.serialize(recordNew);
     Assert.assertFalse(Arrays.equals(arr, arrNew));
 
     // test ZkDataAccessor
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+    ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
     admin.addCluster(className, true);
     InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
     admin.addInstance(className, instanceConfig);
 
     // oversized data should not create any new data on zk
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     IdealState idealState = new IdealState("currentState");
@@ -161,122 +162,131 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
   @Test
   public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
     String className = getShortClassName();
-    System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
+        System.currentTimeMillis()));
 
     ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
-    ZkClient zkClient = new ZkClient(ZK_ADDR);
-    zkClient.setZkSerializer(serializer);
-    String root = className;
-    byte[] buf = new byte[1024];
-    for (int i = 0; i < 1024; i++) {
-      buf[i] = 'a';
-    }
-    String bufStr = new String(buf);
-
-    // test zkClient
-    // legal-sized data gets written to zk
-    // write a znode of size less than 1m
-    final ZNRecord smallRecord = new ZNRecord("normalsize");
-    smallRecord.getSimpleFields().clear();
-    for (int i = 0; i < 900; i++) {
-      smallRecord.setSimpleField(i + "", bufStr);
-    }
-
-    String path1 = "/" + root + "/test1";
-    zkClient.createPersistent(path1, true);
-    zkClient.writeData(path1, smallRecord);
-
-    ZNRecord record = zkClient.readData(path1);
-    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
-    // oversized data doesn't create any data on zk
-    // prepare a znode of size larger than 1m
-    final ZNRecord largeRecord = new ZNRecord("oversize");
-    largeRecord.getSimpleFields().clear();
-    for (int i = 0; i < 1024; i++) {
-      largeRecord.setSimpleField(i + "", bufStr);
-    }
-    String path2 = "/" + root + "/test2";
-    zkClient.createPersistent(path2, true);
-    try {
-      zkClient.writeData(path2, largeRecord);
-    } catch (HelixException e) {
-      Assert.fail("Should not fail because data size is larger than 1M since compression applied");
-    }
-    record = zkClient.readData(path2);
-    Assert.assertNotNull(record);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
 
-    // oversized write doesn't overwrite existing data on zk
-    record = zkClient.readData(path1);
     try {
-      zkClient.writeData(path1, largeRecord);
-    } catch (HelixException e) {
+      zkClient.setZkSerializer(serializer);
+      String root = className;
+      byte[] buf = new byte[1024];
+      for (int i = 0; i < 1024; i++) {
+        buf[i] = 'a';
+      }
+      String bufStr = new String(buf);
+
+      // test zkClient
+      // legal-sized data gets written to zk
+      // write a znode of size less than 1m
+      final ZNRecord smallRecord = new ZNRecord("normalsize");
+      smallRecord.getSimpleFields().clear();
+      for (int i = 0; i < 900; i++) {
+        smallRecord.setSimpleField(i + "", bufStr);
+      }
+
+      String path1 = "/" + root + "/test1";
+      zkClient.createPersistent(path1, true);
+      zkClient.writeData(path1, smallRecord);
+
+      ZNRecord record = zkClient.readData(path1);
+      Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+      // oversized data doesn't create any data on zk
+      // prepare a znode of size larger than 1m
+      final ZNRecord largeRecord = new ZNRecord("oversize");
+      largeRecord.getSimpleFields().clear();
+      for (int i = 0; i < 1024; i++) {
+        largeRecord.setSimpleField(i + "", bufStr);
+      }
+      String path2 = "/" + root + "/test2";
+      zkClient.createPersistent(path2, true);
+      try {
+        zkClient.writeData(path2, largeRecord);
+      } catch (HelixException e) {
+        Assert
+            .fail("Should not fail because data size is larger than 1M since compression applied");
+      }
+      record = zkClient.readData(path2);
+      Assert.assertNotNull(record);
+
+      // oversized write doesn't overwrite existing data on zk
+      record = zkClient.readData(path1);
+      try {
+        zkClient.writeData(path1, largeRecord);
+      } catch (HelixException e) {
+        Assert
+            .fail("Should not fail because data size is larger than 1M since compression applied");
+      }
+      ZNRecord recordNew = zkClient.readData(path1);
+      byte[] arr = serializer.serialize(record);
+      byte[] arrNew = serializer.serialize(recordNew);
+      Assert.assertFalse(Arrays.equals(arr, arrNew));
+
+      // test ZkDataAccessor
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+      admin.addCluster(className, true);
+      InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+      admin.addInstance(className, instanceConfig);
+
+      // oversized data should not create any new data on zk
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      // ZNRecord statusUpdates = new ZNRecord("statusUpdates");
+      IdealState idealState = new IdealState("currentState");
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+
+      for (int i = 0; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(i + "", bufStr);
+      }
+      boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState);
+      Assert.assertTrue(succeed);
+      HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
+      Assert.assertNotNull(property);
+
+      // legal sized data gets written to zk
+      idealState.getRecord().getSimpleFields().clear();
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+
+      for (int i = 0; i < 900; i++) {
+        idealState.getRecord().setSimpleField(i + "", bufStr);
+      }
+      succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_2"), idealState);
+      Assert.assertTrue(succeed);
+      record = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+      Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
+
+      // oversized data should not update existing data on zk
+      idealState.getRecord().getSimpleFields().clear();
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+
+      for (int i = 900; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(i + "", bufStr);
+      }
+      // System.out.println("record: " + idealState.getRecord());
+      succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState);
+      Assert.assertTrue(succeed);
+      recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
+      arr = serializer.serialize(record);
+      arrNew = serializer.serialize(recordNew);
+      Assert.assertFalse(Arrays.equals(arr, arrNew));
+    } catch (HelixException ex) {
       Assert.fail("Should not fail because data size is larger than 1M since compression applied");
+    } finally {
+      zkClient.close();
     }
-    ZNRecord recordNew = zkClient.readData(path1);
-    byte[] arr = serializer.serialize(record);
-    byte[] arrNew = serializer.serialize(recordNew);
-    Assert.assertFalse(Arrays.equals(arr, arrNew));
-
-    // test ZkDataAccessor
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
-    admin.addCluster(className, true);
-    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
-    admin.addInstance(className, instanceConfig);
-
-    // oversized data should not create any new data on zk
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    // ZNRecord statusUpdates = new ZNRecord("statusUpdates");
-    IdealState idealState = new IdealState("currentState");
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
-
-    for (int i = 0; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(i + "", bufStr);
-    }
-    boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState);
-    Assert.assertTrue(succeed);
-    HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
-    Assert.assertNotNull(property);
-
-    // legal sized data gets written to zk
-    idealState.getRecord().getSimpleFields().clear();
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
-
-    for (int i = 0; i < 900; i++) {
-      idealState.getRecord().setSimpleField(i + "", bufStr);
-    }
-    succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_2"), idealState);
-    Assert.assertTrue(succeed);
-    record = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
-    Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
-
-    // oversized data should not update existing data on zk
-    idealState.getRecord().getSimpleFields().clear();
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
-
-    for (int i = 900; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(i + "", bufStr);
-    }
-    // System.out.println("record: " + idealState.getRecord());
-    succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState);
-    Assert.assertTrue(succeed);
-    recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
-    arr = serializer.serialize(record);
-    arrNew = serializer.serialize(recordNew);
-    Assert.assertFalse(Arrays.equals(arr, arrNew));
-
-    System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
-        + new Date(System.currentTimeMillis()));
 
+    System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
+        System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
index 424d7b4..26444e3 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java
@@ -30,6 +30,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordUpdater;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -42,7 +44,8 @@ public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     // init external base data accessor
-    ZkClient extZkclient = new ZkClient(ZK_ADDR);
+    HelixZkClient extZkclient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
     extZkclient.setZkSerializer(new ZNRecordSerializer());
     ZkBaseDataAccessor<ZNRecord> extBaseAccessor = new ZkBaseDataAccessor<ZNRecord>(extZkclient);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
index 83cccc6..68d7a24 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
@@ -27,12 +27,15 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordUpdater;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.HelixPropertyListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -76,9 +79,10 @@ public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     // init external base data accessor
-    ZkClient zkclient = new ZkClient(ZK_ADDR);
+    HelixZkClient zkclient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
     zkclient.setZkSerializer(new ZNRecordSerializer());
-    ZkBaseDataAccessor<ZNRecord> extBaseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
+    ZkBaseDataAccessor<ZNRecord> extBaseAccessor = new ZkBaseDataAccessor<>(zkclient);
 
     // init zkCacheDataAccessor
     String curStatePath = PropertyPathBuilder.instanceCurrentState(clusterName, "localhost_8901");

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index 0534812..f734502 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -20,9 +20,7 @@ package org.apache.helix.manager.zk;
  */
 
 import java.util.Date;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.SystemPropertyKeys;
@@ -42,82 +40,6 @@ import org.testng.annotations.Test;
 public class TestZkFlapping extends ZkUnitTestBase {
   private final int _disconnectThreshold = 5;
 
-  @Test
-  public void testZkSessionExpiry() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-
-    String path = String.format("/%s", clusterName);
-    client.createEphemeral(path);
-    String oldSessionId = ZkTestHelper.getSessionId(client);
-    ZkTestHelper.expireSession(client);
-    String newSessionId = ZkTestHelper.getSessionId(client);
-    Assert.assertNotSame(newSessionId, oldSessionId);
-    Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
-    client.close();
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test
-  public void testCloseZkClient() {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-    String path = String.format("/%s", clusterName);
-    client.createEphemeral(path);
-
-    client.close();
-    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
-        + " should be removed after ZkClient#close()");
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test
-  public void testCloseZkClientInZkClientEventThread() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    final CountDownLatch waitCallback = new CountDownLatch(1);
-    final ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-    String path = String.format("/%s", clusterName);
-    client.createEphemeral(path);
-    client.subscribeDataChanges(path, new IZkDataListener() {
-
-      @Override
-      public void handleDataDeleted(String dataPath) throws Exception {
-      }
-
-      @Override
-      public void handleDataChange(String dataPath, Object data) throws Exception {
-        client.close();
-        waitCallback.countDown();
-      }
-    });
-
-    client.writeData(path, new ZNRecord("test"));
-    waitCallback.await();
-    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
-        + " should be removed after ZkClient#close() in its own event-thread");
-
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-  }
-
   class ZkStateCountListener implements IZkStateListener {
     int count = 0;
 
@@ -165,7 +87,7 @@ public class TestZkFlapping extends ZkUnitTestBase {
           new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participant.syncStart();
 
-      final ZkClient client = participant.getZkClient();
+      final ZkClient client = (ZkClient) participant.getZkClient();
       final ZkStateCountListener listener = new ZkStateCountListener();
       client.subscribeStateChanges(listener);
 
@@ -249,7 +171,7 @@ public class TestZkFlapping extends ZkUnitTestBase {
           new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
       controller.syncStart();
 
-      final ZkClient client = controller.getZkClient();
+      final ZkClient client = (ZkClient) controller.getZkClient();
       final ZkStateCountListener listener = new ZkStateCountListener();
       client.subscribeStateChanges(listener);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
index 691623e..4d8ea99 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -19,10 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.ZkServer;
@@ -32,11 +28,10 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -44,7 +39,6 @@ import org.testng.annotations.Test;
 
 public class TestZkReconnect {
   private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);
-  ExecutorService _executor = Executors.newSingleThreadExecutor();
 
   @Test
   public void testHelixManagerStateListenerCallback() throws Exception {
@@ -95,7 +89,7 @@ public class TestZkReconnect {
       // 1. shutdown zkServer and check if handler trigger callback
       zkServer.shutdown();
       // Simulate a retry in ZkClient that will not succeed
-      injectExpire((ZkClient) controller._zkclient);
+      ZkTestHelper.injectExpire(controller._zkclient);
       Assert.assertFalse(controller._zkclient.waitUntilConnected(5000, TimeUnit.MILLISECONDS));
       // While retrying, onDisconnectedFlag = false
       Assert.assertFalse(onDisconnectedFlag.get());
@@ -142,17 +136,4 @@ public class TestZkReconnect {
       System.clearProperty(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT);
     }
   }
-
-  private void injectExpire(final ZkClient zkClient)
-      throws ExecutionException, InterruptedException {
-    Future future = _executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        WatchedEvent event =
-            new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-        zkClient.process(event);
-      }
-    });
-    future.get();
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
index 5026999..47428c6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
@@ -6,8 +6,9 @@ import java.util.Map;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 
-public class MockZkClient extends ZkClient {
+public class MockZkClient extends ZkClient implements HelixZkClient {
   Map<String, byte[]> _dataMap;
 
   public MockZkClient(String zkAddress) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
index ff6e44e..02b1247 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
@@ -27,13 +27,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
@@ -45,14 +48,15 @@ import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 public class MockController {
-  private final ZkClient client;
+  private final HelixZkClient client;
   private final String srcName;
   private final String clusterName;
 
   public MockController(String src, String zkServer, String cluster) {
     srcName = src;
     clusterName = cluster;
-    client = new ZkClient(zkServer);
+    client = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkServer));
     client.setZkSerializer(new ZNRecordSerializer());
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 4dc3957..8eb9a2b 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -45,7 +45,7 @@ import org.apache.helix.api.listeners.ScopedConfigChangeListener;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -57,7 +57,7 @@ public class MockZKHelixManager implements HelixManager {
   private final InstanceType _type;
 
   public MockZKHelixManager(String clusterName, String instanceName, InstanceType type,
-      ZkClient zkClient) {
+      HelixZkClient zkClient) {
     _instanceName = instanceName;
     _clusterName = clusterName;
     _type = type;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
index c4c140e..280c885 100644
--- a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
@@ -36,7 +36,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
@@ -341,7 +341,7 @@ public class TestZkHelixPropertyStore extends ZkUnitTestBase {
     }
   }
 
-  private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp) {
+  private void setNodes(HelixZkClient zkClient, String root, char c, boolean needTimestamp) {
     char[] data = new char[bufSize];
 
     for (int i = 0; i < bufSize; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index ec80e64..40f814e 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -19,7 +19,6 @@ package org.apache.helix.tools;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Date;
 import org.apache.helix.BaseDataAccessor;
@@ -35,7 +34,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
@@ -57,7 +55,6 @@ public class TestClusterSetup extends ZkUnitTestBase {
   protected static final String STATE_MODEL = "MasterSlave";
   protected static final String TEST_NODE = "testnode_1";
 
-  ZkClient _zkClient;
   ClusterSetup _clusterSetup;
 
   private static String[] createArgs(String str) {
@@ -67,24 +64,20 @@ public class TestClusterSetup extends ZkUnitTestBase {
   }
 
   @BeforeClass()
-  public void beforeClass() throws IOException, Exception {
+  public void beforeClass() throws Exception {
     System.out.println("START TestClusterSetup.beforeClass() "
         + new Date(System.currentTimeMillis()));
-
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
   }
 
   @AfterClass()
   public void afterClass() {
-    _zkClient.close();
     System.out.println("END TestClusterSetup.afterClass() " + new Date(System.currentTimeMillis()));
   }
 
   @BeforeMethod()
   public void setup() {
 
-    _zkClient.deleteRecursively("/" + CLUSTER_NAME);
+    _gZkClient.deleteRecursively("/" + CLUSTER_NAME);
     _clusterSetup = new ClusterSetup(ZK_ADDR);
     _clusterSetup.addCluster(CLUSTER_NAME, true);
   }
@@ -123,11 +116,11 @@ public class TestClusterSetup extends ZkUnitTestBase {
 
     // verify instances
     for (String instance : instanceAddresses) {
-      verifyInstance(_zkClient, CLUSTER_NAME, instance, true);
+      verifyInstance(_gZkClient, CLUSTER_NAME, instance, true);
     }
 
     _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
-    verifyInstance(_zkClient, CLUSTER_NAME, nextInstanceAddress, true);
+    verifyInstance(_gZkClient, CLUSTER_NAME, nextInstanceAddress, true);
     // re-add
     boolean caughtException = false;
     try {
@@ -160,11 +153,11 @@ public class TestClusterSetup extends ZkUnitTestBase {
     // disable
     _clusterSetup.getClusterManagementTool().enableInstance(CLUSTER_NAME, nextInstanceAddress,
         false);
-    verifyEnabled(_zkClient, CLUSTER_NAME, nextInstanceAddress, false);
+    verifyEnabled(_gZkClient, CLUSTER_NAME, nextInstanceAddress, false);
 
     // drop
     _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
-    verifyInstance(_zkClient, CLUSTER_NAME, nextInstanceAddress, false);
+    verifyInstance(_gZkClient, CLUSTER_NAME, nextInstanceAddress, false);
 
     // re-drop
     caughtException = false;
@@ -206,15 +199,15 @@ public class TestClusterSetup extends ZkUnitTestBase {
       _clusterSetup.addResourceToCluster(CLUSTER_NAME, TEST_DB, 16, STATE_MODEL);
     } catch (Exception e) {
     }
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+    verifyResource(_gZkClient, CLUSTER_NAME, TEST_DB, true);
   }
 
   @Test()
   public void testRemoveResource() throws Exception {
     _clusterSetup.setupTestCluster(CLUSTER_NAME);
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+    verifyResource(_gZkClient, CLUSTER_NAME, TEST_DB, true);
     _clusterSetup.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, false);
+    verifyResource(_gZkClient, CLUSTER_NAME, TEST_DB, false);
   }
 
   @Test()
@@ -223,7 +216,7 @@ public class TestClusterSetup extends ZkUnitTestBase {
     // testAddInstancesToCluster();
     testAddResource();
     _clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 4);
-    verifyReplication(_zkClient, CLUSTER_NAME, TEST_DB, 4);
+    verifyReplication(_gZkClient, CLUSTER_NAME, TEST_DB, 4);
   }
 
   /*
@@ -238,33 +231,33 @@ public class TestClusterSetup extends ZkUnitTestBase {
     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+ " help"));
 
     // wipe ZK
-    _zkClient.deleteRecursively("/" + CLUSTER_NAME);
+    _gZkClient.deleteRecursively("/" + CLUSTER_NAME);
     _clusterSetup = new ClusterSetup(ZK_ADDR);
 
     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addCluster "
         + CLUSTER_NAME));
 
     // wipe again
-    _zkClient.deleteRecursively("/" + CLUSTER_NAME);
+    _gZkClient.deleteRecursively("/" + CLUSTER_NAME);
     _clusterSetup = new ClusterSetup(ZK_ADDR);
 
     _clusterSetup.setupTestCluster(CLUSTER_NAME);
 
     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addNode "
         + CLUSTER_NAME + " " + TEST_NODE));
-    verifyInstance(_zkClient, CLUSTER_NAME, TEST_NODE, true);
+    verifyInstance(_gZkClient, CLUSTER_NAME, TEST_NODE, true);
     try {
       ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addResource "
           + CLUSTER_NAME + " " + TEST_DB + " 4 " + STATE_MODEL));
     } catch (Exception e) {
 
     }
-    verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
+    verifyResource(_gZkClient, CLUSTER_NAME, TEST_DB, true);
     // ClusterSetup
     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --addNode node-1"));
     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --enableInstance "
         + CLUSTER_NAME + " " + TEST_NODE + " true"));
-    verifyEnabled(_zkClient, CLUSTER_NAME, TEST_NODE, true);
+    verifyEnabled(_gZkClient, CLUSTER_NAME, TEST_NODE, true);
 
     // TODO: verify list commands
     /*
@@ -291,7 +284,7 @@ public class TestClusterSetup extends ZkUnitTestBase {
     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --rebalance "+CLUSTER_NAME+" "+TEST_DB+" 1"));
     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --enableInstance "
         + CLUSTER_NAME + " " + TEST_NODE + " false"));
-    verifyEnabled(_zkClient, CLUSTER_NAME, TEST_NODE, false);
+    verifyEnabled(_gZkClient, CLUSTER_NAME, TEST_NODE, false);
     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --dropNode "
         + CLUSTER_NAME + " " + TEST_NODE));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 02e74c5..a66e8dd 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -32,12 +32,14 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.tools.ClusterSetup;
 
 public class ServerContext {
   private final String _zkAddr;
-  private ZkClient _zkClient;
+  private HelixZkClient _zkClient;
   private ZKHelixAdmin _zkHelixAdmin;
   private ClusterSetup _clusterSetup;
   private ConfigAccessor _configAccessor;
@@ -59,24 +61,31 @@ public class ServerContext {
     _taskDriverPool = new HashMap<>();
   }
 
-  public ZkClient getZkClient() {
+  public HelixZkClient getHelixZkClient() {
     if (_zkClient == null) {
-      _zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+      clientConfig.setZkSerializer(new ZNRecordSerializer());
+      _zkClient = SharedZkClientFactory
+          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
     }
     return _zkClient;
   }
 
+  @Deprecated
+  public ZkClient getZkClient() {
+    return (ZkClient) getHelixZkClient();
+  }
+
   public HelixAdmin getHelixAdmin() {
     if (_zkHelixAdmin == null) {
-      _zkHelixAdmin = new ZKHelixAdmin(getZkClient());
+      _zkHelixAdmin = new ZKHelixAdmin(getHelixZkClient());
     }
     return _zkHelixAdmin;
   }
 
   public ClusterSetup getClusterSetup() {
     if (_clusterSetup == null) {
-      _clusterSetup = new ClusterSetup(getZkClient(), getHelixAdmin());
+      _clusterSetup = new ClusterSetup(getHelixZkClient(), getHelixAdmin());
     }
     return _clusterSetup;
   }
@@ -84,7 +93,7 @@ public class ServerContext {
   public TaskDriver getTaskDriver(String clusterName) {
     synchronized (_taskDriverPool) {
       if (!_taskDriverPool.containsKey(clusterName)) {
-        _taskDriverPool.put(clusterName, new TaskDriver(getZkClient(), clusterName));
+        _taskDriverPool.put(clusterName, new TaskDriver(getHelixZkClient(), clusterName));
       }
       return _taskDriverPool.get(clusterName);
     }
@@ -92,7 +101,7 @@ public class ServerContext {
 
   public ConfigAccessor getConfigAccessor() {
     if (_configAccessor == null) {
-      _configAccessor = new ConfigAccessor(getZkClient());
+      _configAccessor = new ConfigAccessor(getHelixZkClient());
     }
     return _configAccessor;
   }
@@ -100,7 +109,7 @@ public class ServerContext {
   public HelixDataAccessor getDataAccssor(String clusterName) {
     synchronized (_helixDataAccessorPool) {
       if (!_helixDataAccessorPool.containsKey(clusterName)) {
-        ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(getZkClient());
+        ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(getHelixZkClient());
         _helixDataAccessorPool.put(clusterName,
             new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index dca0852..9b8eabe 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -30,17 +30,8 @@ import javax.ws.rs.core.Application;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.rest.common.ContextPropertyKeys;
-import org.apache.helix.rest.server.ServerContext;
 import org.apache.helix.rest.server.auditlog.AuditLog;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.tools.ClusterSetup;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index 1eac9c2..90bd256 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -25,6 +25,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.server.ServerContext;
 import org.apache.helix.rest.server.resources.AbstractResource;
@@ -39,9 +40,14 @@ import org.apache.helix.tools.ClusterSetup;
  */
 public class AbstractHelixResource extends AbstractResource{
 
-  public ZkClient getZkClient() {
+  public HelixZkClient getHelixZkClient() {
     ServerContext serverContext = getServerContext();
-    return serverContext.getZkClient();
+    return serverContext.getHelixZkClient();
+  }
+
+  @Deprecated
+  public ZkClient getZkClient() {
+    return (ZkClient) getHelixZkClient();
   }
 
   public HelixAdmin getHelixAdmin() {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index f6f95b0..3892fc6 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -40,7 +40,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.LeaderHistory;
@@ -391,7 +391,7 @@ public class ClusterAccessor extends AbstractHelixResource {
   }
 
   private boolean isClusterExist(String cluster) {
-    ZkClient zkClient = getZkClient();
+    HelixZkClient zkClient = getHelixZkClient();
     if (ZKUtil.isClusterSetup(cluster, zkClient)) {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index 5eb8a4c..a54cd8d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -41,7 +41,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
@@ -75,7 +75,7 @@ public class ResourceAccessor extends AbstractHelixResource {
     ObjectNode root = JsonNodeFactory.instance.objectNode();
     root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
 
-    ZkClient zkClient = getZkClient();
+    HelixZkClient zkClient = getHelixZkClient();
 
     ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
     ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
@@ -106,7 +106,7 @@ public class ResourceAccessor extends AbstractHelixResource {
   @Path("health")
   public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
 
-    ZkClient zkClient = getZkClient();
+    HelixZkClient zkClient = getHelixZkClient();
 
     List<String> resourcesInIdealState = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
     List<String> resourcesInExternalView = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
@@ -392,4 +392,4 @@ public class ResourceAccessor extends AbstractHelixResource {
     }
     return partitionHealthResult;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 0f7a43d..2596f30 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -31,7 +31,8 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TaskTestUtil;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.common.HelixRestNamespace;
 import org.apache.helix.rest.server.auditlog.AuditLog;
@@ -82,7 +83,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   protected static int NUM_PARTITIONS = 10;
   protected static int NUM_REPLICA = 3;
   protected static ZkServer _zkServer;
-  protected static ZkClient _gZkClient;
+  protected static HelixZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
   protected static ConfigAccessor _configAccessor;
   protected static BaseDataAccessor<ZNRecord> _baseAccessor;
@@ -93,7 +94,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   protected static ZkServer _zkServerTestNS;
   protected static final String _zkAddrTestNS = "localhost:2124";
   protected static final String TEST_NAMESPACE = "test-namespace";
-  protected static ZkClient _gZkClientTestNS;
+  protected static HelixZkClient _gZkClientTestNS;
   protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
 
   protected static Set<String> _clusters;
@@ -202,10 +203,16 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
       topJavaLogger.setLevel(Level.WARNING);
 
-      _gZkClient = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-      _gZkClientTestNS = new ZkClient(_zkAddrTestNS, ZkClient.DEFAULT_SESSION_TIMEOUT,
-          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+
+      clientConfig.setZkSerializer(new ZNRecordSerializer());
+      _gZkClient = DedicatedZkClientFactory
+          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
+
+      clientConfig.setZkSerializer(new ZNRecordSerializer());
+      _gZkClientTestNS = DedicatedZkClientFactory
+          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);
+
       _gSetupTool = new ClusterSetup(_gZkClient);
       _configAccessor = new ConfigAccessor(_gZkClient);
       _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);


[4/4] helix git commit: Using HelixZkClient to replace ZkClient in helix-core and helix-rest.

Posted by jx...@apache.org.
Using HelixZkClient to replace ZkClient in helix-core and helix-rest.

1. Replace as much usage as possible. For the raw ZkClient tests, the usages are kept.
2. For backward compatibility, some public interfaces still returns ZkClient. Marks them as Deprecated.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9d7364d7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9d7364d7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9d7364d7

Branch: refs/heads/master
Commit: 9d7364d7abba3932a1b25e96e4eb9dd3e203cec9
Parents: 01076ca
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Wed Sep 26 11:39:42 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 18:15:22 2018 -0700

----------------------------------------------------------------------
 .../controller/HierarchicalDataHolder.java      |  10 +-
 .../examples/IdealStateBuilderExample.java      |  12 +-
 .../helix/examples/IdealStateExample.java       |  14 +-
 .../helix/manager/zk/CallbackHandler.java       |  10 +-
 .../helix/manager/zk/ParticipantManager.java    |   2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java |  12 +-
 .../helix/manager/zk/zookeeper/ZkClient.java    |   4 +
 .../java/org/apache/helix/task/TaskDriver.java  |   8 +-
 .../tools/ClusterExternalViewVerifier.java      |   4 +-
 .../helix/tools/ClusterLiveNodesVerifier.java   |   6 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  13 +-
 .../helix/tools/ClusterStateVerifier.java       |  33 ++-
 .../org/apache/helix/tools/ClusterVerifier.java |   6 +-
 .../BestPossibleExternalViewVerifier.java       |  13 +-
 .../ClusterLiveNodesVerifier.java               |   6 +-
 .../StrictMatchExternalViewVerifier.java        |  14 +-
 .../ZkHelixClusterVerifier.java                 |  19 +-
 .../org/apache/helix/tools/TestExecutor.java    |  13 +-
 .../tools/commandtools/IntegrationTestUtil.java |  19 +-
 .../org/apache/helix/util/ZKClientPool.java     |   2 +-
 .../test/java/org/apache/helix/TestHelper.java  |  33 ++-
 .../apache/helix/TestHierarchicalDataStore.java |  24 +-
 .../java/org/apache/helix/TestZKCallback.java   |   2 -
 .../test/java/org/apache/helix/TestZkBasis.java |  88 ++++++-
 .../java/org/apache/helix/TestZnodeModify.java  |  21 +-
 .../java/org/apache/helix/ZkTestHelper.java     |  68 +++--
 .../org/apache/helix/common/ZkTestBase.java     |  54 ++--
 .../TestCorrectnessOnConnectivityLoss.java      |  18 +-
 .../apache/helix/integration/TestDriver.java    |  84 +++---
 .../integration/TestEnableCompression.java      |  15 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   2 +-
 .../helix/integration/TestPauseSignal.java      |   6 +-
 .../integration/TestResourceGroupEndtoEnd.java  |   6 +-
 .../helix/integration/TestStatusUpdate.java     |   7 +-
 .../integration/TestZkCallbackHandlerLeak.java  |   4 +-
 .../helix/integration/TestZkConnectionLost.java |  16 +-
 .../manager/ClusterControllerManager.java       |   6 +-
 .../manager/ClusterDistributedController.java   |   6 +-
 .../manager/MockParticipantManager.java         |   6 +-
 .../integration/manager/ZkTestManager.java      |   4 +-
 .../rebalancer/TestAutoRebalance.java           |   7 +-
 .../TestAutoRebalancePartitionLimit.java        |   8 +-
 .../TestCustomizedIdealStateRebalancer.java     |   8 +-
 .../rebalancer/TestFullAutoNodeTagging.java     |  15 +-
 .../helix/manager/zk/TestZNRecordSizeLimit.java | 256 ++++++++++---------
 .../zk/TestZkCacheAsyncOpSingleThread.java      |   5 +-
 .../zk/TestZkCacheSyncOpSingleThread.java       |   8 +-
 .../apache/helix/manager/zk/TestZkFlapping.java |  82 +-----
 .../helix/manager/zk/TestZkReconnect.java       |  23 +-
 .../org/apache/helix/mock/MockZkClient.java     |   3 +-
 .../helix/mock/controller/MockController.java   |  10 +-
 .../helix/participant/MockZKHelixManager.java   |   4 +-
 .../store/zk/TestZkHelixPropertyStore.java      |   4 +-
 .../apache/helix/tools/TestClusterSetup.java    |  39 ++-
 .../apache/helix/rest/server/ServerContext.java |  27 +-
 .../rest/server/resources/AbstractResource.java |   9 -
 .../resources/helix/AbstractHelixResource.java  |  10 +-
 .../server/resources/helix/ClusterAccessor.java |   4 +-
 .../resources/helix/ResourceAccessor.java       |   8 +-
 .../helix/rest/server/AbstractTestClass.java    |  21 +-
 60 files changed, 663 insertions(+), 578 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
index 3f3d999..27ecc40 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,16 +42,16 @@ public class HierarchicalDataHolder<T> {
    * currentVersion, gets updated when data is read from original source
    */
   AtomicLong currentVersion;
-  private final ZkClient _zkClient;
+  private final HelixZkClient _zkClient;
   private final String _rootPath;
   private final FileFilter _filter;
 
-  public HierarchicalDataHolder(ZkClient client, String rootPath, FileFilter filter) {
+  public HierarchicalDataHolder(HelixZkClient client, String rootPath, FileFilter filter) {
     this._zkClient = client;
     this._rootPath = rootPath;
     this._filter = filter;
     // Node<T> initialValue = new Node<T>();
-    root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
+    root = new AtomicReference<>();
     currentVersion = new AtomicLong(1);
     refreshData();
   }
@@ -99,7 +99,7 @@ public class HierarchicalDataHolder<T> {
           Node<T> oldChild =
               (oldRoot != null && oldRoot.children != null) ? oldRoot.children.get(child) : null;
           if (newRoot.children == null) {
-            newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
+            newRoot.children = new ConcurrentHashMap<>();
           }
           if (!newRoot.children.contains(child)) {
             newRoot.children.put(child, new Node<T>());

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
index b89d830..71e6662 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -51,10 +52,11 @@ public class IdealStateBuilderExample {
     final String clusterName = args[1];
     RebalanceMode idealStateMode = RebalanceMode.valueOf(args[2].toUpperCase());
 
-    ZkClient zkclient =
-        new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            new ZNRecordSerializer());
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    final HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
 
     // add cluster
     admin.addCluster(clusterName, true);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
index 7c5192d..723cbbe 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -31,7 +32,7 @@ import org.apache.helix.tools.StateModelConfigGenerator;
 /**
  * Ideal state json format file used in this example for CUSTOMIZED ideal state mode
  * <p>
- * 
+ *
  * <pre>
  * {
  * "id" : "TestDB",
@@ -93,10 +94,11 @@ public class IdealStateExample {
     }
 
     // add cluster {clusterName}
-    ZkClient zkclient =
-        new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            new ZNRecordSerializer());
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
     admin.addCluster(clusterName, true);
 
     // add MasterSlave state mode definition

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index b6d452d..9e9d1a7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -179,19 +179,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  @Deprecated
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
     this(manager, client, propertyKey, listener, eventTypes, changeType, null);
   }
 
-  @Deprecated
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
-      Object listener, EventType[] eventTypes, ChangeType changeType,
-      HelixCallbackMonitor monitor) {
-    this(manager, (HelixZkClient) client, propertyKey, listener, eventTypes, changeType, monitor);
-  }
-
   public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
         Object listener, EventType[] eventTypes, ChangeType changeType,
         HelixCallbackMonitor monitor) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 36fb969..28641e0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -77,7 +77,7 @@ public class ParticipantManager {
   final LiveInstanceInfoProvider _liveInstanceInfoProvider;
   final List<PreConnectCallback> _preConnectCallbacks;
 
-  public ParticipantManager(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+  public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
       LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks) {
     _zkclient = zkclient;
     _manager = manager;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 98e2737..c673f51 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -715,6 +715,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _messagingService.getExecutor().shutdown();
 
       // TODO reset user defined handlers only
+      // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
+      // This is because reset logic contains ZK operations.
       resetHandlers();
 
       if (_leaderElectionHandler != null) {
@@ -902,10 +904,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     boolean isConnected;
     do {
       isConnected =
-          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+          _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
       if (!isConnected) {
         LOG.error("fail to connect zkserver: " + _zkAddress + " in "
-            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+            + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
             + ", clusterName: " + _clusterName);
         continue;
       }
@@ -998,6 +1000,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         }
 
         try {
+          // TODO Call disconnect in another thread.
+          // handleStateChanged is triggered in ZkClient eventThread. The disconnect logic will
+          // interrupt this thread. This issue prevents the ZkClient.close() from complete. So the
+          // client is left in a strange state.
           disconnect();
         } catch (Exception ex) {
           LOG.error("Disconnect HelixManager is not completely done.", ex);
@@ -1094,7 +1100,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _participantManager.reset();
     }
     _participantManager =
-        new ParticipantManager(this, (ZkClient) _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+        new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
             _preConnectCallbacks);
     _participantManager.handleNewSession();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 9c18d09..4d2a93d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -1491,6 +1491,10 @@ public class ZkClient implements Watcher {
           throw new HelixException(
               "Unable to connect to zookeeper server with the specified ZkConnection");
         }
+        // TODO Refine the init state here. Here we pre-config it to be connected. This may not be
+        // the case, if the connection is connecting or recovering. -- JJ
+        // For shared client, the event notification will not be forwarded before wather add to the
+        // connection manager.
         setCurrentState(KeeperState.SyncConnected);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 4fe732b..0225f83 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,7 +25,7 @@ import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
@@ -84,13 +84,13 @@ public class TaskDriver {
         manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
-  public TaskDriver(ZkClient client, String clusterName) {
+  public TaskDriver(HelixZkClient client, String clusterName) {
     this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
   }
 
-  public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+  public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+        new ZkHelixPropertyStore<>(baseAccessor,
             PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index af80b48..7129c9f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -29,7 +29,7 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Partition;
 import org.slf4j.Logger;
@@ -54,7 +54,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
 
   final List<String> _expectSortedLiveNodes; // always sorted
 
-  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+  public ClusterExternalViewVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
     _expectSortedLiveNodes = expectLiveNodes;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
index d1187ab..164cfcc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
@@ -19,11 +19,11 @@ package org.apache.helix.tools;
  * under the License.
  */
 
-import org.apache.helix.manager.zk.ZkClient;
-
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.helix.manager.zk.client.HelixZkClient;
+
 /**
  * Please use the class is in tools.ClusterVerifiers.
  */
@@ -32,7 +32,7 @@ public class ClusterLiveNodesVerifier extends ClusterVerifier {
 
   final List<String> _expectSortedLiveNodes; // always sorted
 
-  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+  public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
     _expectSortedLiveNodes = expectLiveNodes;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 94a5f70..d21877f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -45,7 +45,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
@@ -135,22 +136,24 @@ public class ClusterSetup {
 
   static Logger _logger = LoggerFactory.getLogger(ClusterSetup.class);
   String _zkServerAddress;
-  ZkClient _zkClient;
+  HelixZkClient _zkClient;
   HelixAdmin _admin;
 
   public ClusterSetup(String zkServerAddress) {
     _zkServerAddress = zkServerAddress;
-    _zkClient = ZKClientPool.getZkClient(_zkServerAddress);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkServerAddress));
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
     _admin = new ZKHelixAdmin(_zkClient);
   }
 
-  public ClusterSetup(ZkClient zkClient) {
+  public ClusterSetup(HelixZkClient zkClient) {
     _zkServerAddress = zkClient.getServers();
     _zkClient = zkClient;
     _admin = new ZKHelixAdmin(_zkClient);
   }
 
-  public ClusterSetup(ZkClient zkClient, HelixAdmin zkHelixAdmin) {
+  public ClusterSetup(HelixZkClient zkClient, HelixAdmin zkHelixAdmin) {
     _zkServerAddress = zkClient.getServers();
     _zkClient = zkClient;
     _admin = zkHelixAdmin;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index cc508ef..cc16ce2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -57,19 +58,19 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.util.ZKClientPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 /**
  * This class is deprecated, please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
  */
@@ -98,10 +99,11 @@ public class ClusterStateVerifier {
   @Deprecated
   static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
     final CountDownLatch _countDown;
-    final ZkClient _zkClient;
+    final HelixZkClient _zkClient;
     final Verifier _verifier;
 
-    public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
+    public ExtViewVeriferZkListener(CountDownLatch countDown, HelixZkClient zkClient,
+        ZkVerifier verifier) {
       _countDown = countDown;
       _zkClient = zkClient;
       _verifier = verifier;
@@ -136,17 +138,20 @@ public class ClusterStateVerifier {
     }
   }
 
-  private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
+  private static HelixZkClient validateAndGetClient(String zkAddr, String clusterName) {
     if (zkAddr == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkAddr|clusterName");
     }
-    return ZKClientPool.getZkClient(zkAddr);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    return DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
   }
 
   public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
     private final String clusterName;
     private final Map<String, Map<String, String>> errStates;
-    private final ZkClient zkClient;
+    private final HelixZkClient zkClient;
     private final Set<String> resources;
 
     public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
@@ -163,7 +168,7 @@ public class ClusterStateVerifier {
       this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
     }
 
-    public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
+    public BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
         Map<String, Map<String, String>> errStates, Set<String> resources) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
@@ -406,7 +411,7 @@ public class ClusterStateVerifier {
 
     @Override
     public ZkClient getZkClient() {
-      return zkClient;
+      return (ZkClient) zkClient;
     }
 
     @Override
@@ -425,13 +430,13 @@ public class ClusterStateVerifier {
 
   public static class MasterNbInExtViewVerifier implements ZkVerifier {
     private final String clusterName;
-    private final ZkClient zkClient;
+    private final HelixZkClient zkClient;
 
     public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
       this(validateAndGetClient(zkAddr, clusterName), clusterName);
     }
 
-    public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
+    public MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
       }
@@ -454,7 +459,7 @@ public class ClusterStateVerifier {
 
     @Override
     public ZkClient getZkClient() {
-      return zkClient;
+      return (ZkClient) zkClient;
     }
 
     @Override
@@ -555,7 +560,7 @@ public class ClusterStateVerifier {
   public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
     long startTime = System.currentTimeMillis();
     CountDownLatch countDown = new CountDownLatch(1);
-    ZkClient zkClient = verifier.getZkClient();
+    HelixZkClient zkClient = verifier.getZkClient();
     String clusterName = verifier.getClusterName();
 
     // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
index 5697bcf..d6e5a73 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
@@ -27,7 +27,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class ClusterVerifier implements IZkChildListener, IZkDataListener {
   private static Logger LOG = LoggerFactory.getLogger(ClusterVerifier.class);
 
-  protected final ZkClient _zkclient;
+  protected final HelixZkClient _zkclient;
   protected final String _clusterName;
   protected final HelixDataAccessor _accessor;
   protected final PropertyKey.Builder _keyBuilder;
@@ -58,7 +58,7 @@ public abstract class ClusterVerifier implements IZkChildListener, IZkDataListen
     }
   }
 
-  public ClusterVerifier(ZkClient zkclient, String clusterName) {
+  public ClusterVerifier(HelixZkClient zkclient, String clusterName) {
     _zkclient = zkclient;
     _clusterName = clusterName;
     _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkclient));

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6e73df6..3517444 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -33,6 +33,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
@@ -73,7 +74,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     _clusterDataCache = new ClusterDataCache();
   }
 
-  public BestPossibleExternalViewVerifier(ZkClient zkClient, String clusterName,
+  public BestPossibleExternalViewVerifier(HelixZkClient zkClient, String clusterName,
       Set<String> resources, Map<String, Map<String, String>> errStates,
       Set<String> expectLiveInstances) {
     super(zkClient, clusterName);
@@ -89,7 +90,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private String _zkAddr;
-    private ZkClient _zkClient;
+    private HelixZkClient _zkClient;
 
     public Builder(String clusterName) {
       _clusterName = clusterName;
@@ -148,11 +149,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
       return this;
     }
 
-    public ZkClient getZkClient() {
+    public HelixZkClient getHelixZkClient() {
       return _zkClient;
     }
 
-    public Builder setZkClient(ZkClient zkClient) {
+    @Deprecated
+    public ZkClient getZkClient() {
+      return (ZkClient) getHelixZkClient();
+    }
+    public Builder setZkClient(HelixZkClient zkClient) {
       _zkClient = zkClient;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
index 2a71566..b4d3862 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
@@ -24,16 +24,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 
 public class ClusterLiveNodesVerifier extends ZkHelixClusterVerifier {
 
   final Set<String> _expectLiveNodes;
 
-  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+  public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
-    _expectLiveNodes = new HashSet<String>(expectLiveNodes);
+    _expectLiveNodes = new HashSet<>(expectLiveNodes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index a1d12fa..e714789 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -63,7 +64,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
     _expectLiveInstances = expectLiveInstances;
   }
 
-  public StrictMatchExternalViewVerifier(ZkClient zkClient, String clusterName,
+  public StrictMatchExternalViewVerifier(HelixZkClient zkClient, String clusterName,
       Set<String> resources, Set<String> expectLiveInstances) {
     super(zkClient, clusterName);
     _resources = resources;
@@ -75,7 +76,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private String _zkAddr;
-    private ZkClient _zkClient;
+    private HelixZkClient _zkClient;
 
     public StrictMatchExternalViewVerifier build() {
       if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
@@ -125,11 +126,16 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
       return this;
     }
 
-    public ZkClient getZkClient() {
+    public HelixZkClient getHelixZkClient() {
       return _zkClient;
     }
 
-    public Builder setZkClient(ZkClient zkClient) {
+    @Deprecated
+    public ZkClient getZkClient() {
+      return (ZkClient) getHelixZkClient();
+    }
+
+    public Builder setZkClient(HelixZkClient zkClient) {
       _zkClient = zkClient;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index dbf9272..9c24f51 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -29,9 +29,11 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.util.ZKClientPool;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,7 @@ public abstract class ZkHelixClusterVerifier
   protected static int DEFAULT_PERIOD = 100;
 
 
-  protected final ZkClient _zkClient;
+  protected final HelixZkClient _zkClient;
   protected final String _clusterName;
   protected final HelixDataAccessor _accessor;
   protected final PropertyKey.Builder _keyBuilder;
@@ -90,7 +92,7 @@ public abstract class ZkHelixClusterVerifier
     }
   }
 
-  public ZkHelixClusterVerifier(ZkClient zkClient, String clusterName) {
+  public ZkHelixClusterVerifier(HelixZkClient zkClient, String clusterName) {
     if (zkClient == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkClient|clusterName");
     }
@@ -104,7 +106,9 @@ public abstract class ZkHelixClusterVerifier
     if (zkAddr == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkAddr|clusterName");
     }
-    _zkClient = ZKClientPool.getZkClient(zkAddr);
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
     _clusterName = clusterName;
     _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     _keyBuilder = _accessor.keyBuilder();
@@ -285,10 +289,15 @@ public abstract class ZkHelixClusterVerifier
     }
   }
 
-  public ZkClient getZkClient() {
+  public HelixZkClient getHelixZkClient() {
     return _zkClient;
   }
 
+  @Deprecated
+  public ZkClient getZkClient() {
+    return (ZkClient) getHelixZkClient();
+  }
+
   public String getClusterName() {
     return _clusterName;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index 908bba5..6a757f3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -34,7 +34,6 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.PropertyJsonComparator;
@@ -536,7 +535,7 @@ public class TestExecutor {
     return result;
   }
 
-  private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, ZkClient zkClient,
+  private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, HelixZkClient zkClient,
       ZNRecord diff) {
     String path = arg._znodePath;
     ZnodePropertyType type = arg._propertyType;
@@ -639,12 +638,12 @@ public class TestExecutor {
   private static class ExecuteCommand implements Runnable {
     private final TestCommand _command;
     private final long _startTime;
-    private final ZkClient _zkClient;
+    private final HelixZkClient _zkClient;
     private final CountDownLatch _countDown;
     private final Map<TestCommand, Boolean> _testResults;
 
     public ExecuteCommand(long startTime, TestCommand command, CountDownLatch countDown,
-        ZkClient zkClient, Map<TestCommand, Boolean> testResults) {
+        HelixZkClient zkClient, Map<TestCommand, Boolean> testResults) {
       _startTime = startTime;
       _command = command;
       _countDown = countDown;
@@ -735,9 +734,7 @@ public class TestExecutor {
         }
         _countDown.countDown();
         if (_countDown.getCount() == 0) {
-          if (_zkClient != null && _zkClient.getConnection() != null)
-
-          {
+          if (_zkClient != null && !_zkClient.isClosed()) {
             _zkClient.close();
           }
         }
@@ -768,7 +765,7 @@ public class TestExecutor {
 
       TestTrigger trigger = command._trigger;
       command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
-      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient,
+      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
           testResults)).start();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
index dc2af8f..18f06f4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
@@ -35,11 +35,12 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.tools.ClusterExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ClusterLiveNodesVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,11 +63,11 @@ public class IntegrationTestUtil {
   public static final String readLeader = "readLeader";
   public static final String verifyClusterState = "verifyClusterState";
 
-  final ZkClient _zkclient;
+  final HelixZkClient _zkclient;
   final ZNRecordSerializer _serializer;
   final long _timeoutValue;
 
-  public IntegrationTestUtil(ZkClient zkclient, long timeoutValue) {
+  public IntegrationTestUtil(HelixZkClient zkclient, long timeoutValue) {
     _zkclient = zkclient;
     _timeoutValue = timeoutValue;
     _serializer = new ZNRecordSerializer();
@@ -213,10 +214,10 @@ public class IntegrationTestUtil {
       System.exit(1);
     }
 
-    String zkServer = cmd.getOptionValue(zkSvr);
-    ZkClient zkclient =
-        new ZkClient(zkServer, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(cmd.getOptionValue(zkSvr)), clientConfig);
 
     long timeoutValue = DEFAULT_TIMEOUT;
     if (cmd.hasOption(timeout)) {
@@ -229,7 +230,7 @@ public class IntegrationTestUtil {
       }
     }
 
-    IntegrationTestUtil util = new IntegrationTestUtil(zkclient, timeoutValue);
+    IntegrationTestUtil util = new IntegrationTestUtil(zkClient, timeoutValue);
 
     if (cmd != null) {
       if (cmd.hasOption(verifyExternalView)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
index 0980e48..3350d57 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -27,7 +27,7 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.zookeeper.ZooKeeper.States;
 
 public class ZKClientPool {
-  static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
+  static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<>();
   static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   public static ZkClient getZkClient(String zkServer) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 2f25e30..edc0646 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -50,7 +51,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -144,7 +146,7 @@ public class TestHelper {
     }
   }
 
-  public static void setupEmptyCluster(ZkClient zkClient, String clusterName) {
+  public static void setupEmptyCluster(HelixZkClient zkClient, String clusterName) {
     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
     admin.addCluster(clusterName, true);
   }
@@ -211,7 +213,8 @@ public class TestHelper {
 
   public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName,
       Set<String> instanceNames, String zkAddr) {
-    ZkClient zkClient = new ZkClient(zkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
@@ -257,17 +260,18 @@ public class TestHelper {
         RebalanceMode.SEMI_AUTO, doRebalance);
   }
 
-  public static void setupCluster(String clusterName, String ZkAddr, int startPort,
+  public static void setupCluster(String clusterName, String zkAddr, int startPort,
       String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
       int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance)
       throws Exception {
-    ZkClient zkClient = new ZkClient(ZkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     if (zkClient.exists("/" + clusterName)) {
       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
       zkClient.deleteRecursively("/" + clusterName);
     }
 
-    ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+    ClusterSetup setupTool = new ClusterSetup(zkAddr);
     setupTool.addCluster(clusterName, true);
 
     for (int i = 0; i < nodesNb; i++) {
@@ -286,12 +290,12 @@ public class TestHelper {
     zkClient.close();
   }
 
-  public static void dropCluster(String clusterName, ZkClient zkClient) throws Exception {
+  public static void dropCluster(String clusterName, HelixZkClient zkClient) throws Exception {
     ClusterSetup setupTool = new ClusterSetup(zkClient);
     dropCluster(clusterName, zkClient, setupTool);
   }
 
-  public static void dropCluster(String clusterName, ZkClient zkClient, ClusterSetup setup) {
+  public static void dropCluster(String clusterName, HelixZkClient zkClient, ClusterSetup setup) {
     String namespace = "/" + clusterName;
     if (zkClient.exists(namespace)) {
       try {
@@ -310,7 +314,8 @@ public class TestHelper {
    */
   public static void verifyState(String clusterName, String zkAddr,
       Map<String, Set<String>> stateMap, String state) {
-    ZkClient zkClient = new ZkClient(zkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
@@ -513,7 +518,7 @@ public class TestHelper {
     System.out.println("END:Print cache");
   }
 
-  public static void readZkRecursive(String path, Map<String, ZNode> map, ZkClient zkclient) {
+  public static void readZkRecursive(String path, Map<String, ZNode> map, HelixZkClient zkclient) {
     try {
       Stat stat = new Stat();
       ZNRecord record = zkclient.readData(path, stat);
@@ -554,7 +559,7 @@ public class TestHelper {
   }
 
   public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor,
-      ZkClient zkclient, boolean needVerifyStat) {
+      HelixZkClient zkclient, boolean needVerifyStat) {
     // read everything
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     Map<String, ZNode> cache = new HashMap<String, ZNode>();
@@ -568,12 +573,12 @@ public class TestHelper {
   }
 
   public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache,
-      ZkClient zkclient, boolean needVerifyStat) {
+      HelixZkClient zkclient, boolean needVerifyStat) {
     return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
   }
 
   public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat,
-      Map<String, ZNode> cache, ZkClient zkclient, boolean needVerifyStat) {
+      Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) {
     // read everything on zk under paths
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     for (String path : paths) {
@@ -799,7 +804,7 @@ public class TestHelper {
     return sb.toString();
   }
 
-  public static void printZkListeners(ZkClient client) throws Exception {
+  public static void printZkListeners(HelixZkClient client) throws Exception {
     Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
     Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
index 07aa4f4..bd123be 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
@@ -20,27 +20,29 @@ package org.apache.helix;
  */
 
 import java.io.FileFilter;
+
 import org.apache.helix.controller.HierarchicalDataHolder;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestHierarchicalDataStore extends ZkUnitTestBase {
-  protected static ZkClient _zkClientString = null;
+  protected static HelixZkClient _zkClient = null;
 
-  @Test(groups = {
-    "unitTest"
+  @Test(groups = { "unitTest"
   })
+
   public void testHierarchicalDataStore() {
-    _zkClientString = new ZkClient(ZK_ADDR, 1000, 3000);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
 
     String path = "/tmp/testHierarchicalDataStore";
     FileFilter filter = null;
-    // _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _zkClientString.deleteRecursively(path);
+    _zkClient.deleteRecursively(path);
     HierarchicalDataHolder<ZNRecord> dataHolder =
-        new HierarchicalDataHolder<ZNRecord>(_zkClientString, path, filter);
+        new HierarchicalDataHolder<ZNRecord>(_zkClient, path, filter);
     dataHolder.print();
     AssertJUnit.assertFalse(dataHolder.refreshData());
 
@@ -69,12 +71,12 @@ public class TestHierarchicalDataStore extends ZkUnitTestBase {
   }
 
   private void set(String path, String data) {
-    _zkClientString.writeData(path, data);
+    _zkClient.writeData(path, data);
   }
 
   private void add(String path, String data) {
-    _zkClientString.createPersistent(path, true);
-    _zkClientString.writeData(path, data);
+    _zkClient.createPersistent(path, true);
+    _zkClient.writeData(path, data);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index ee0a7c7..b80e4d6 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -29,8 +29,6 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
index 8b25214..5ef27a5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -20,6 +20,7 @@ package org.apache.helix;
  */
 
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,7 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,6 +66,83 @@ public class TestZkBasis extends ZkUnitTestBase {
     }
   }
 
+
+  @Test
+  public void testZkSessionExpiry() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    String oldSessionId = ZkTestHelper.getSessionId(client);
+    ZkTestHelper.expireSession(client);
+    String newSessionId = ZkTestHelper.getSessionId(client);
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+    client.close();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClient() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+
+    client.close();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close()");
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClientInZkClientEventThread() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    final CountDownLatch waitCallback = new CountDownLatch(1);
+    final ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    client.subscribeDataChanges(path, new IZkDataListener() {
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+      }
+
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+        client.close();
+        waitCallback.countDown();
+      }
+    });
+
+    client.writeData(path, new ZNRecord("test"));
+    waitCallback.await();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close() in its own event-thread");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
   /**
    * test zk watchers are renewed automatically after session expiry
    * zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
@@ -76,14 +155,13 @@ public class TestZkBasis extends ZkUnitTestBase {
    */
   @Test
   public void testWatchRenew() throws Exception {
-
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String testName = className + "_" + methodName;
 
     final ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
     // make sure "/testName/test" doesn't exist
     final String path = "/" + testName + "/test";
     client.delete(path);
@@ -127,8 +205,8 @@ public class TestZkBasis extends ZkUnitTestBase {
     String testName = className + "_" + methodName;
 
     final ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
     // make sure "/testName/test" doesn't exist
     final String path = "/" + testName + "/test";
     client.createPersistent(path, true);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
index b898b8c..e5851f5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.TestCommand;
@@ -210,10 +209,8 @@ public class TestZnodeModify extends ZkUnitTestBase {
       public void run() {
         try {
           Thread.sleep(3000);
-          final ZkClient zkClient = new ZkClient(ZK_ADDR);
-          zkClient.setZkSerializer(new ZNRecordSerializer());
-          zkClient.createPersistent(pathChild1, true);
-          zkClient.writeData(pathChild1, record);
+          _gZkClient.createPersistent(pathChild1, true);
+          _gZkClient.writeData(pathChild1, record);
         } catch (InterruptedException e) {
           logger.error("Interrupted sleep", e);
         }
@@ -228,28 +225,19 @@ public class TestZnodeModify extends ZkUnitTestBase {
 
   }
 
-  ZkClient _zkClient;
-
   @BeforeClass()
   public void beforeClass() {
     System.out.println("START " + getShortClassName() + " at "
         + new Date(System.currentTimeMillis()));
-
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-    if (_zkClient.exists(PREFIX)) {
-      _zkClient.deleteRecursively(PREFIX);
+    if (_gZkClient.exists(PREFIX)) {
+      _gZkClient.deleteRecursively(PREFIX);
     }
-
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
-
     System.out
         .println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   private ZNRecord getExampleZNRecord() {
@@ -267,5 +255,4 @@ public class TestZnodeModify extends ZkUnitTestBase {
     record.setListField("TestDB_0", list);
     return record;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 32d1085..19cd2e8 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -32,6 +32,10 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -41,6 +45,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -54,6 +59,7 @@ import org.testng.Assert;
 
 public class ZkTestHelper {
   private static Logger LOG = LoggerFactory.getLogger(ZkTestHelper.class);
+  private static ExecutorService _executor = Executors.newSingleThreadExecutor();
 
   static {
     // Logger.getRootLogger().setLevel(Level.DEBUG);
@@ -62,11 +68,12 @@ public class ZkTestHelper {
   /**
    * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
    */
-  public static void simulateZkStateReconnected(ZkClient client) {
+  public static void simulateZkStateReconnected(HelixZkClient client) {
+    ZkClient zkClient = (ZkClient) client;
     WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
-    client.process(event);
+    zkClient.process(event);
     event = new WatchedEvent(EventType.None, KeeperState.SyncConnected, null);
-    client.process(event);
+    zkClient.process(event);
   }
 
   /**
@@ -74,19 +81,20 @@ public class ZkTestHelper {
    * @param client
    * @return
    */
-  public static String getSessionId(ZkClient client) {
-    ZkConnection connection = ((ZkConnection) client.getConnection());
+  public static String getSessionId(HelixZkClient client) {
+    ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
     ZooKeeper curZookeeper = connection.getZookeeper();
     return Long.toHexString(curZookeeper.getSessionId());
   }
 
   /**
    * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
-   * @param zkClient
+   * @param client
    * @throws Exception
    */
 
-  public static void disconnectSession(final ZkClient zkClient) throws Exception {
+  public static void disconnectSession(HelixZkClient client) throws Exception {
+    final ZkClient zkClient = (ZkClient) client;
     IZkStateListener listener = new IZkStateListener() {
       @Override
       public void handleStateChanged(KeeperState state) throws Exception {
@@ -96,7 +104,7 @@ public class ZkTestHelper {
       @Override
       public void handleNewSession() throws Exception {
         // make sure zkclient is connected again
-        zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+        zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
         ZooKeeper curZookeeper = connection.getZookeeper();
@@ -138,8 +146,9 @@ public class ZkTestHelper {
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
-  public static void expireSession(final ZkClient zkClient) throws Exception {
+  public static void expireSession(HelixZkClient client) throws Exception {
     final CountDownLatch waitNewSession = new CountDownLatch(1);
+    final ZkClient zkClient = (ZkClient) client;
 
     IZkStateListener listener = new IZkStateListener() {
       @Override
@@ -150,7 +159,7 @@ public class ZkTestHelper {
       @Override
       public void handleNewSession() throws Exception {
         // make sure zkclient is connected again
-        zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+        zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
         ZooKeeper curZookeeper = connection.getZookeeper();
@@ -204,10 +213,11 @@ public class ZkTestHelper {
 
   /**
    * expire zk session asynchronously
-   * @param zkClient
+   * @param client
    * @throws Exception
    */
-  public static void asyncExpireSession(final ZkClient zkClient) throws Exception {
+  public static void asyncExpireSession(HelixZkClient client) throws Exception {
+    final ZkClient zkClient = (ZkClient) client;
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
     LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
@@ -238,7 +248,7 @@ public class ZkTestHelper {
   /*
    * stateMap: partition->instance->state
    */
-  public static boolean verifyState(ZkClient zkclient, String clusterName, String resourceName,
+  public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
       Map<String, Map<String, String>> expectStateMap, String op) {
     boolean result = true;
     ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -382,9 +392,11 @@ public class ZkTestHelper {
     }
   }
 
-  public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
+  public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
     Map<String, List<String>> lists = new HashMap<String, List<String>>();
-    ZkConnection connection = ((ZkConnection) client.getConnection());
+    ZkClient zkClient = (ZkClient) client;
+
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper zk = connection.getZookeeper();
 
     java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
@@ -406,14 +418,14 @@ public class ZkTestHelper {
     HashMap<String, Set<Watcher>> childWatches =
         (HashMap<String, Set<Watcher>>) field2.get(watchManager);
 
-    lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
-    lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
-    lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
+    lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+    lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+    lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
 
     return lists;
   }
 
-  public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client)
+  public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
     field.setAccessible(true);
@@ -422,7 +434,7 @@ public class ZkTestHelper {
     return dataListener;
   }
 
-  public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client)
+  public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
     field.setAccessible(true);
@@ -431,7 +443,7 @@ public class ZkTestHelper {
     return childListener;
   }
 
-  public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
+  public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
     java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
     field.setAccessible(true);
     Object eventThread = field.get(zkclient);
@@ -456,4 +468,18 @@ public class ZkTestHelper {
     }
     return false;
   }
+
+  public static void injectExpire(HelixZkClient client)
+      throws ExecutionException, InterruptedException {
+    final ZkClient zkClient = (ZkClient) client;
+    Future future = _executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        WatchedEvent event =
+            new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+        zkClient.process(event);
+      }
+    });
+    future.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index c69744e..b0c44e1 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
+
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkServer;
@@ -56,6 +57,8 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
@@ -72,7 +75,6 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.helix.util.ZKClientPool;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -90,7 +92,7 @@ public class ZkTestBase {
   private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
 
   protected static ZkServer _zkServer;
-  protected static ZkClient _gZkClient;
+  protected static HelixZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
   protected static BaseDataAccessor<ZNRecord> _baseAccessor;
 
@@ -113,17 +115,17 @@ public class ZkTestBase {
 
     _zkServer = TestHelper.startZkServer(ZK_ADDR);
     AssertJUnit.assertTrue(_zkServer != null);
-    ZKClientPool.reset();
 
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    _gZkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
     _gSetupTool = new ClusterSetup(_gZkClient);
     _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
   }
 
   @AfterSuite
   public void afterSuite() {
-    ZKClientPool.reset();
     _gZkClient.close();
     TestHelper.stopZkServer(_zkServer);
   }
@@ -149,7 +151,7 @@ public class ZkTestBase {
     return this.getClass().getSimpleName();
   }
 
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
+  protected String getCurrentLeader(HelixZkClient zkClient, String clusterName) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -161,7 +163,7 @@ public class ZkTestBase {
     return leader.getInstanceName();
   }
 
-  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
+  protected void stopCurrentLeader(HelixZkClient zkClient, String clusterName,
       Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
     String leader = getCurrentLeader(zkClient, clusterName);
     Assert.assertTrue(leader != null);
@@ -202,7 +204,7 @@ public class ZkTestBase {
     new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
   }
 
-  protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
+  protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -210,7 +212,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName,
+  protected void enablePersistIntermediateAssignment(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -218,7 +220,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName,
+  protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -226,7 +228,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+  protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName,
       boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -234,7 +236,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableDelayRebalanceInInstance(ZkClient zkClient, String clusterName,
+  protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String clusterName,
       String instanceName, boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
@@ -274,7 +276,7 @@ public class ZkTestBase {
     }
   }
 
-  protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
+  protected void setDelayTimeInCluster(HelixZkClient zkClient, String clusterName, long delay) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
     clusterConfig.setRebalanceDelayTime(delay);
@@ -412,7 +414,7 @@ public class ZkTestBase {
     stage.postProcess();
   }
 
-  public void verifyInstance(ZkClient zkClient, String clusterName, String instance,
+  public void verifyInstance(HelixZkClient zkClient, String clusterName, String instance,
       boolean wantExists) {
     // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
     String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
@@ -422,13 +424,13 @@ public class ZkTestBase {
     AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
   }
 
-  public void verifyResource(ZkClient zkClient, String clusterName, String resource,
+  public void verifyResource(HelixZkClient zkClient, String clusterName, String resource,
       boolean wantExists) {
     String resourcePath = PropertyPathBuilder.idealState(clusterName, resource);
     AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
   }
 
-  public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
+  public void verifyEnabled(HelixZkClient zkClient, String clusterName, String instance,
       boolean wantEnabled) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
@@ -438,7 +440,7 @@ public class ZkTestBase {
     AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
   }
 
-  public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
+  public void verifyReplication(HelixZkClient zkClient, String clusterName, String resource, int repl) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -476,8 +478,10 @@ public class ZkTestBase {
     LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
   }
 
-  protected void simulateSessionExpiry(ZkClient zkClient)
+  protected void simulateSessionExpiry(HelixZkClient client)
       throws IOException, InterruptedException, IOException {
+    ZkClient zkClient = (ZkClient) client;
+
     IZkStateListener listener = new IZkStateListener() {
       @Override
       public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
@@ -667,17 +671,21 @@ public class ZkTestBase {
   protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier {
     private final String _clusterName;
     private final String _resourceName;
-    private final ZkClient _zkClient;
+    private final HelixZkClient _zkClient;
 
     /**
      * Instantiate the verifier
-     * @param clusterName the cluster to verify
+     *
+     * @param clusterName  the cluster to verify
      * @param resourceName the resource to verify
      */
     public EmptyZkVerifier(String clusterName, String resourceName) {
       _clusterName = clusterName;
       _resourceName = resourceName;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+
+      _zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
     }
 
     @Override
@@ -717,7 +725,7 @@ public class ZkTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _zkClient;
+      return (ZkClient) _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index 3554207..a78e7bf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -20,6 +20,8 @@ package org.apache.helix.integration;
  */
 
 import com.google.common.collect.Maps;
+
+import java.lang.reflect.Method;
 import java.util.Map;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixManager;
@@ -50,11 +52,11 @@ public class TestCorrectnessOnConnectivityLoss {
   private ClusterControllerManager _controller;
 
   @BeforeMethod
-  public void beforeMethod() throws Exception {
-    _zkServer = TestHelper.startZkServer(ZK_ADDR);
+  public void beforeMethod(Method testMethod) throws Exception {
+    _zkServer = TestHelper.startZkServer(ZK_ADDR, null, false);
 
     String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
+    String methodName = testMethod.getName();
     _clusterName = className + "_" + methodName;
     TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant start port
         "localhost", // participant host
@@ -71,6 +73,11 @@ public class TestCorrectnessOnConnectivityLoss {
     _controller.connect();
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    TestHelper.stopZkServer(_zkServer);
+  }
+
   @Test
   public void testParticipant() throws Exception {
     Map<String, Integer> stateReachedCounts = Maps.newHashMap();
@@ -136,11 +143,6 @@ public class TestCorrectnessOnConnectivityLoss {
     }
   }
 
-  @AfterMethod
-  public void afterMethod() throws Exception {
-    TestHelper.stopZkServer(_zkServer);
-  }
-
   @StateModelInfo(initialState = "OFFLINE", states = {
       "MASTER", "SLAVE", "OFFLINE", "ERROR"
   })


[2/4] helix git commit: Refactor JobRebalancer to JobDipatcher

Posted by jx...@apache.org.
Refactor JobRebalancer to JobDipatcher

Current JobRebalancer is a little bit messing that mixing workflow update and scheduling logic together. Refactor JobRebalancer to JobHandler which will schedule and update the status of the Jobs and Tasks.

The task status update may still associates with new task assignment.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/01076ca2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/01076ca2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/01076ca2

Branch: refs/heads/master
Commit: 01076ca26ed244f1da682b9d9c97cab8851d6ed3
Parents: 0c3ac37
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Sep 20 16:55:34 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 18:09:47 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/JobDispatcher.java    | 417 +++++++++++++++++++
 .../org/apache/helix/task/JobRebalancer.java    | 396 +-----------------
 .../org/apache/helix/task/TaskConstants.java    |   3 +
 3 files changed, 434 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/01076ca2/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
new file mode 100644
index 0000000..06f6ce4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -0,0 +1,417 @@
+package org.apache.helix.task;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobDispatcher extends AbstractTaskDispatcher {
+  private static final Logger LOG = LoggerFactory.getLogger(JobDispatcher.class);
+  private ClusterDataCache _clusterDataCache;
+
+  public void updateCache(ClusterDataCache cache) {
+    _clusterDataCache = cache;
+  }
+
+  public ResourceAssignment processJobStatusUpdateandAssignment(String jobName,
+      CurrentStateOutput currStateOutput, IdealState taskIs) {
+    // Fetch job configuration
+    JobConfig jobCfg = _clusterDataCache.getJobConfig(jobName);
+    if (jobCfg == null) {
+      LOG.error("Job configuration is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+    String workflowResource = jobCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = _clusterDataCache.getWorkflowConfig(workflowResource);
+    if (workflowCfg == null) {
+      LOG.error("Workflow configuration is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    WorkflowContext workflowCtx = _clusterDataCache.getWorkflowContext(workflowResource);
+    if (workflowCtx == null) {
+      LOG.error("Workflow context is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState != TargetState.START && targetState != TargetState.STOP) {
+      LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
+          + ".Stop scheduling job " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    // Stop current run of the job if workflow or job is already in final state (failed or
+    // completed)
+    TaskState workflowState = workflowCtx.getWorkflowState();
+    TaskState jobState = workflowCtx.getJobState(jobName);
+    // The job is already in a final state (completed/failed).
+    if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
+        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+      LOG.info(String.format(
+          "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
+          workflowResource, jobName, workflowState, jobState));
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+      _rebalanceScheduler.removeScheduledRebalance(jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    if (!isWorkflowReadyForSchedule(workflowCfg)) {
+      LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
+        workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
+        _clusterDataCache.getJobConfigMap(), _clusterDataCache.getTaskDataCache())) {
+      LOG.info("Job is not ready to run " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    // Fetch any existing context information from the property store.
+    JobContext jobCtx = _clusterDataCache.getJobContext(jobName);
+    if (jobCtx == null) {
+      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
+      jobCtx.setStartTime(System.currentTimeMillis());
+      jobCtx.setName(jobName);
+      workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
+    }
+
+    if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
+      scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
+    }
+
+    // Grab the old assignment, or an empty one if it doesn't exist
+    ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(jobName);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<String> liveInstances =
+        jobCfg.getInstanceGroupTag() == null ? _clusterDataCache.getEnabledLiveInstances()
+            : _clusterDataCache.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+
+    if (liveInstances.isEmpty()) {
+      LOG.error("No available instance found for job!");
+    }
+
+    TargetState jobTgtState = workflowCfg.getTargetState();
+    jobState = workflowCtx.getJobState(jobName);
+    workflowState = workflowCtx.getWorkflowState();
+
+    if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
+        || TaskState.TIMED_OUT.equals(workflowState))) {
+      jobState = TaskState.TIMING_OUT;
+      workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
+    } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
+      // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
+      // Update running status in workflow context
+      if (jobTgtState == TargetState.STOP) {
+        if (TaskUtil.checkJobStopped(jobCtx)) {
+          workflowCtx.setJobState(jobName, TaskState.STOPPED);
+        } else {
+          workflowCtx.setJobState(jobName, TaskState.STOPPING);
+        }
+        // Workflow has been stopped if all in progress jobs are stopped
+        if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+          workflowCtx.setWorkflowState(TaskState.STOPPED);
+        } else {
+          workflowCtx.setWorkflowState(TaskState.STOPPING);
+        }
+      } else {
+        workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
+        // Workflow is in progress if any task is in progress
+        workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+      }
+    }
+
+    Set<Integer> partitionsToDrop = new TreeSet<>();
+    ResourceAssignment newAssignment =
+        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, prevAssignment,
+            liveInstances, currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _clusterDataCache);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
+
+    // TODO: will be removed when we are trying to get rid of IdealState
+    taskIs = _clusterDataCache.getIdealState(jobName);
+    if (!partitionsToDrop.isEmpty() && taskIs != null) {
+      for (Integer pId : partitionsToDrop) {
+        taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
+      }
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update Workflow and Job context in data cache and ZK.
+    _clusterDataCache.updateJobContext(jobName, jobCtx);
+    _clusterDataCache.updateWorkflowContext(workflowResource, workflowCtx);
+
+    setPrevResourceAssignment(jobName, newAssignment);
+
+    LOG.debug("Job " + jobName + " new assignment "
+        + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
+    return newAssignment;
+  }
+
+  private ResourceAssignment computeResourceMapping(String jobResource,
+      WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
+      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> liveInstances,
+      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx,
+      Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) {
+
+    // Used to keep track of tasks that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<>();
+
+    // Used to keep track of tasks that have failed, but whose failure is acceptable
+    Set<Integer> skippedPartitions = new HashSet<>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
+
+    Set<String> excludedInstances =
+        getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);
+
+    // Process all the current assignments of tasks.
+    TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache);
+    Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
+        workflowConfig, workflowCtx, cache.getIdealStates());
+
+    if (allPartitions == null || allPartitions.isEmpty()) {
+      // Empty target partitions, mark the job as FAILED.
+      String failureMsg =
+          "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
+      LOG.info(failureMsg);
+      jobCtx.setInfo(failureMsg);
+      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
+      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+      return new ResourceAssignment(jobResource);
+    }
+
+    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
+        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
+            allPartitions);
+    long currentTime = System.currentTimeMillis();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
+          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+    }
+
+    // Release resource for tasks in terminal state
+    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+        currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
+        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache);
+
+    addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
+
+    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
+        || (jobCfg.getTargetResource() != null
+        && cache.getIdealState(jobCfg.getTargetResource()) != null
+        && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
+      if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
+        failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
+        return buildEmptyAssignment(jobResource, currStateOutput);
+      }
+      workflowCtx.setJobState(jobResource, TaskState.FAILING);
+      // Drop all assigned but not given-up tasks
+      for (int pId : jobCtx.getPartitionSet()) {
+        String instance = jobCtx.getAssignedParticipant(pId);
+        if (jobCtx.getPartitionState(pId) != null && !isTaskGivenup(jobCtx, jobCfg, pId)) {
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
+        }
+        Partition partition = new Partition(pName(jobResource, pId));
+        Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
+        // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
+        // so that Helix will cancel the transition.
+        if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
+        }
+      }
+
+      return toResourceAssignment(jobResource, paMap);
+    }
+
+    if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) {
+      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
+      return buildEmptyAssignment(jobResource, currStateOutput);
+    }
+
+    if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
+      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(),
+          cache);
+      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
+          jobCtx.getFinishTime() - jobCtx.getStartTime());
+      _rebalanceScheduler.removeScheduledRebalance(jobResource);
+      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+      return buildEmptyAssignment(jobResource, currStateOutput);
+    }
+
+    // If job is being timed out and no task is running (for whatever reason), idealState can be
+    // deleted and all tasks
+    // can be dropped(note that Helix doesn't track whether the drop is success or not).
+    if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
+      handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
+      return buildEmptyAssignment(jobResource, currStateOutput);
+    }
+
+    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+    scheduleForNextTask(jobResource, jobCtx, currentTime);
+
+    // Make additional task assignments if needed.
+    if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
+        && jobTgtState == TargetState.START) {
+      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
+          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
+          taskAssignmentCal, allPartitions, currentTime, liveInstances);
+    }
+
+    return toResourceAssignment(jobResource, paMap);
+  }
+
+  private ResourceAssignment toResourceAssignment(String jobResource,
+      Map<Integer, PartitionAssignment> paMap) {
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(jobResource);
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
+          ImmutableMap.of(pa._instance, pa._state));
+    }
+    return ra;
+  }
+
+  private boolean isJobFinished(JobContext jobContext, String jobResource,
+      CurrentStateOutput currentStateOutput) {
+    for (int pId : jobContext.getPartitionSet()) {
+      TaskPartitionState state = jobContext.getPartitionState(pId);
+      Partition partition = new Partition(pName(jobResource, pId));
+      String instance = jobContext.getAssignedParticipant(pId);
+      Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
+          instance);
+      // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
+      if (state == TaskPartitionState.RUNNING
+          || (state == TaskPartitionState.INIT && pendingMessage != null)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the last task assignment for a given job
+   * @param resourceName the name of the job
+   * @return {@link ResourceAssignment} instance, or null if no assignment is available
+   */
+  private ResourceAssignment getPrevResourceAssignment(String resourceName) {
+    ZNRecord r = _manager.getHelixPropertyStore().get(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.PREV_RA_NODE),
+        null, AccessOption.PERSISTENT);
+    return r != null ? new ResourceAssignment(r) : null;
+  }
+
+  /**
+   * Set the last task assignment for a given job
+   * @param resourceName the name of the job
+   * @param ra {@link ResourceAssignment} containing the task assignment
+   */
+  private void setPrevResourceAssignment(String resourceName, ResourceAssignment ra) {
+    _manager.getHelixPropertyStore().set(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.PREV_RA_NODE),
+        ra.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Checks if the job has completed. Look at states of all tasks of the job, there're 3 kind:
+   * completed, given up, not given up. The job is completed if all tasks are completed or given up,
+   * and the number of given up tasks is within job failure threshold.
+   */
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, JobConfig cfg) {
+    int numOfGivenUpTasks = 0;
+    // Iterate through all tasks, if any one indicates the job has not completed, return false.
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state != TaskPartitionState.COMPLETED) {
+        if (!isTaskGivenup(ctx, cfg, pId)) {
+          return false;
+        }
+        // If the task is given up, there's still chance the job has completed because of job
+        // failure threshold.
+        numOfGivenUpTasks++;
+      }
+    }
+    return numOfGivenUpTasks <= cfg.getFailureThreshold();
+  }
+
+  /**
+   * @param liveInstances
+   * @param prevAssignment task partition -> (instance -> state)
+   * @param allTaskPartitions all task partitionIds
+   * @return instance -> partitionIds from previous assignment, if the instance is still live
+   */
+  private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
+      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
+      Set<Integer> allTaskPartitions) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : liveInstances) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    for (Partition partition : prevAssignment.getMappedPartitions()) {
+      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+      if (allTaskPartitions.contains(pId)) {
+        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
+        for (String instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance);
+          if (pList != null) {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * If the job is a targeted job, use fixedTaskAssignmentCalculator. Otherwise, use
+   * threadCountBasedTaskAssignmentCalculator. Both calculators support quota-based scheduling.
+   * @param jobConfig
+   * @param cache
+   * @return
+   */
+  private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig,
+      ClusterDataCache cache) {
+    AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
+    if (TaskUtil.isGenericTaskJob(jobConfig)) {
+      return new ThreadCountBasedTaskAssignmentCalculator(new ThreadCountBasedTaskAssigner(),
+          assignableInstanceManager);
+    }
+    return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/01076ca2/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 143053e..11cdf23 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -53,393 +53,25 @@ import com.google.common.collect.ImmutableMap;
  */
 public class JobRebalancer extends TaskRebalancer {
   private static final Logger LOG = LoggerFactory.getLogger(JobRebalancer.class);
-  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
+  private JobDispatcher _jobDispatcher;
 
   @Override
-  public ResourceAssignment computeBestPossiblePartitionState(
-      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, IdealState taskIs, Resource resource,
       CurrentStateOutput currStateOutput) {
+    long startTime = System.currentTimeMillis();
     final String jobName = resource.getResourceName();
     LOG.debug("Computer Best Partition for job: " + jobName);
 
-    // Fetch job configuration
-    JobConfig jobCfg = clusterData.getJobConfig(jobName);
-    if (jobCfg == null) {
-      LOG.error("Job configuration is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflowResource);
-    if (workflowCfg == null) {
-      LOG.error("Workflow configuration is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowResource);
-    if (workflowCtx == null) {
-      LOG.error("Workflow context is NULL for " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState != TargetState.START && targetState != TargetState.STOP) {
-      LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
-          + ".Stop scheduling job " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    // Stop current run of the job if workflow or job is already in final state (failed or
-    // completed)
-    TaskState workflowState = workflowCtx.getWorkflowState();
-    TaskState jobState = workflowCtx.getJobState(jobName);
-    // The job is already in a final state (completed/failed).
-    if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
-        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
-      LOG.info(String.format(
-          "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
-          workflowResource, jobName, workflowState, jobState));
-      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
-      _rebalanceScheduler.removeScheduledRebalance(jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
-        workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        clusterData.getJobConfigMap(), clusterData.getTaskDataCache())) {
-      LOG.info("Job is not ready to run " + jobName);
-      return buildEmptyAssignment(jobName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-    JobContext jobCtx = clusterData.getJobContext(jobName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
-      jobCtx.setStartTime(System.currentTimeMillis());
-      jobCtx.setName(jobName);
-      workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
-    }
-
-    if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
-      scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(jobName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<String> liveInstances =
-        jobCfg.getInstanceGroupTag() == null ? clusterData.getEnabledLiveInstances()
-            : clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
-
-    if (liveInstances.isEmpty()) {
-      LOG.error("No available instance found for job!");
-    }
-
-    TargetState jobTgtState = workflowCfg.getTargetState();
-    jobState = workflowCtx.getJobState(jobName);
-    workflowState = workflowCtx.getWorkflowState();
-
-    if (jobState == TaskState.IN_PROGRESS && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
-        || TaskState.TIMED_OUT.equals(workflowState))) {
-      jobState = TaskState.TIMING_OUT;
-      workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
-    } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
-      // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
-      // Update running status in workflow context
-      if (jobTgtState == TargetState.STOP) {
-        if (TaskUtil.checkJobStopped(jobCtx)) {
-          workflowCtx.setJobState(jobName, TaskState.STOPPED);
-        } else {
-          workflowCtx.setJobState(jobName, TaskState.STOPPING);
-        }
-        // Workflow has been stopped if all in progress jobs are stopped
-        if (isWorkflowStopped(workflowCtx, workflowCfg)) {
-          workflowCtx.setWorkflowState(TaskState.STOPPED);
-        } else {
-          workflowCtx.setWorkflowState(TaskState.STOPPING);
-        }
-      } else {
-        workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
-        // Workflow is in progress if any task is in progress
-        workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-      }
-    }
-
-    Set<Integer> partitionsToDrop = new TreeSet<>();
-    ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, prevAssignment,
-            liveInstances, currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData);
-
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
-    taskIs = clusterData.getIdealState(jobName);
-    if (!partitionsToDrop.isEmpty() && taskIs != null) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
-      }
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(jobName, jobCtx);
-    clusterData.updateWorkflowContext(workflowResource, workflowCtx);
-
-    setPrevResourceAssignment(jobName, newAssignment);
-
-    LOG.debug("Job " + jobName + " new assignment "
-        + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
-    return newAssignment;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
-      ResourceAssignment prevTaskToInstanceStateAssignment, Collection<String> liveInstances,
-      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx,
-      Set<Integer> partitionsToDropFromIs, ClusterDataCache cache) {
-
-    // Used to keep track of tasks that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<>();
-
-    // Used to keep track of tasks that have failed, but whose failure is acceptable
-    Set<Integer> skippedPartitions = new HashSet<>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
-
-    Set<String> excludedInstances =
-        getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);
-
-    // Process all the current assignments of tasks.
-    TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache);
-    Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
-        workflowConfig, workflowCtx, cache.getIdealStates());
-
-    if (allPartitions == null || allPartitions.isEmpty()) {
-      // Empty target partitions, mark the job as FAILED.
-      String failureMsg =
-          "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
-      LOG.info(failureMsg);
-      jobCtx.setInfo(failureMsg);
-      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
-      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
-      return new ResourceAssignment(jobResource);
-    }
-
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions);
-    long currentTime = System.currentTimeMillis();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
-    }
-
-    // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
-        currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
-        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache);
-
-    addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
-
-    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
-        || (jobCfg.getTargetResource() != null
-        && cache.getIdealState(jobCfg.getTargetResource()) != null
-        && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
-
-      if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
-        failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
-        return buildEmptyAssignment(jobResource, currStateOutput);
-      }
-      workflowCtx.setJobState(jobResource, TaskState.FAILING);
-      // Drop all assigned but not given-up tasks
-      for (int pId : jobCtx.getPartitionSet()) {
-        String instance = jobCtx.getAssignedParticipant(pId);
-        if (jobCtx.getPartitionState(pId) != null && !isTaskGivenup(jobCtx, jobCfg, pId)) {
-          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
-        }
-        Partition partition = new Partition(pName(jobResource, pId));
-        Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance);
-        // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
-        // so that Helix will cancel the transition.
-        if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
-          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
-        }
-      }
-
-      return toResourceAssignment(jobResource, paMap);
-    }
-
-    if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) {
-      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
-      return buildEmptyAssignment(jobResource, currStateOutput);
-    }
-
-    if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
-      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(),
-          cache);
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
-          jobCtx.getFinishTime() - jobCtx.getStartTime());
-      _rebalanceScheduler.removeScheduledRebalance(jobResource);
-      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
-      return buildEmptyAssignment(jobResource, currStateOutput);
-    }
-
-    // If job is being timed out and no task is running (for whatever reason), idealState can be
-    // deleted and all tasks
-    // can be dropped(note that Helix doesn't track whether the drop is success or not).
-    if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
-      handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
-      return buildEmptyAssignment(jobResource, currStateOutput);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    // Make additional task assignments if needed.
-    if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
-        && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource,
-          currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
-          prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
-          taskAssignmentCal, allPartitions, currentTime, liveInstances);
-    }
-
-    return toResourceAssignment(jobResource, paMap);
-  }
-
-  private ResourceAssignment toResourceAssignment(String jobResource,
-      Map<Integer, PartitionAssignment> paMap) {
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-          ImmutableMap.of(pa._instance, pa._state));
-    }
-    return ra;
-  }
-
-  private boolean isJobFinished(JobContext jobContext, String jobResource,
-      CurrentStateOutput currentStateOutput) {
-    for (int pId : jobContext.getPartitionSet()) {
-      TaskPartitionState state = jobContext.getPartitionState(pId);
-      Partition partition = new Partition(pName(jobResource, pId));
-      String instance = jobContext.getAssignedParticipant(pId);
-      Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
-          instance);
-      // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
-      if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT
-          && pendingMessage != null)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Get the last task assignment for a given job
-   * @param resourceName the name of the job
-   *
-   * @return {@link ResourceAssignment} instance, or null if no assignment is available
-   */
-  private ResourceAssignment getPrevResourceAssignment(String resourceName) {
-    ZNRecord r = _manager.getHelixPropertyStore().get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-        null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  private void setPrevResourceAssignment(String resourceName, ResourceAssignment ra) {
-    _manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Checks if the job has completed. Look at states of all tasks of the job, there're 3 kind:
-   * completed, given up, not given up. The job is completed if all tasks are completed or given up,
-   * and the number of given up tasks is within job failure threshold.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, JobConfig cfg) {
-    int numOfGivenUpTasks = 0;
-    // Iterate through all tasks, if any one indicates the job has not completed, return false.
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state != TaskPartitionState.COMPLETED) {
-        if (!isTaskGivenup(ctx, cfg, pId)) {
-          return false;
-        }
-        // If the task is given up, there's still chance the job has completed because of job
-        // failure threshold.
-        numOfGivenUpTasks++;
-      }
-    }
-    return numOfGivenUpTasks <= cfg.getFailureThreshold();
-  }
-
-  /**
-   * @param liveInstances
-   * @param prevAssignment    task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
-   *
-   * @return instance -> partitionIds from previous assignment, if the instance is still live
-   */
-  private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : liveInstances) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-  /**
-   * If the job is a targeted job, use fixedTaskAssignmentCalculator. Otherwise, use
-   * threadCountBasedTaskAssignmentCalculator. Both calculators support quota-based scheduling.
-   * @param jobConfig
-   * @param cache
-   * @return
-   */
-  private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig,
-      ClusterDataCache cache) {
-    AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
-    if (TaskUtil.isGenericTaskJob(jobConfig)) {
-      return new ThreadCountBasedTaskAssignmentCalculator(new ThreadCountBasedTaskAssigner(),
-          assignableInstanceManager);
-    }
-    return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
+    if (_jobDispatcher == null) {
+      _jobDispatcher = new JobDispatcher();
+    }
+    _jobDispatcher.init(_manager);
+    _jobDispatcher.updateCache(clusterData);
+    _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
+    ResourceAssignment resourceAssignment =
+        _jobDispatcher.processJobStatusUpdateandAssignment(jobName, currStateOutput, taskIs);
+    LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s", +System.currentTimeMillis() - startTime,
+        jobName));
+    return resourceAssignment;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/01076ca2/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index eee57d3..c7273ad 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -46,4 +46,7 @@ public class TaskConstants {
 
   public static final long DEFAULT_NEVER_TIMEOUT = -1; // never timeout
 
+  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
+
+
 }