You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[18/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
deleted file mode 100644
index fd1ec11..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
+++ /dev/null
@@ -1,1243 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataTree;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
-{
-  enum RetCode
-  {
-    OK, NODE_EXISTS, ERROR
-  }
-
-  private static Logger  LOG = Logger.getLogger(ZkBaseDataAccessor.class);
-
-  private final ZkClient _zkClient;
-
-  public ZkBaseDataAccessor(ZkClient zkClient)
-  {
-    _zkClient = zkClient;
-  }
-
-  /**
-   * sync create
-   */
-  @Override
-  public boolean create(String path, T record, int options)
-  {
-    return create(path, record, null, options) == RetCode.OK;
-  }
-
-  /**
-   * sync create
-   */
-  public RetCode create(String path, T record, List<String> pathCreated, int options)
-  {
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid create mode. options: " + options);
-      return RetCode.ERROR;
-    }
-
-    boolean retry;
-    do
-    {
-      retry = false;
-      try
-      {
-        _zkClient.create(path, record, mode);
-        if (pathCreated != null)
-          pathCreated.add(path);
-
-        return RetCode.OK;
-      }
-      catch (ZkNoNodeException e)
-      {
-        // this will happen if parent node does not exist
-        String parentPath = new File(path).getParent();
-        try
-        {
-          RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
-          if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
-          {
-            // if parent node created/exists, retry
-            retry = true;
-          }
-        }
-        catch (Exception e1)
-        {
-          LOG.error("Exception while creating path: " + parentPath, e1);
-          return RetCode.ERROR;
-        }
-      }
-      catch (ZkNodeExistsException e)
-      {
-        LOG.warn("Node already exists. path: " + path);
-        return RetCode.NODE_EXISTS;
-      }
-      catch (Exception e)
-      {
-        LOG.error("Exception while creating path: " + path, e);
-        return RetCode.ERROR;
-      }
-    }
-    while (retry);
-
-    return RetCode.OK;
-  }
-
-  /**
-   * sync set
-   */
-  @Override
-  public boolean set(String path, T record, int options)
-  {
-    return set(path, record, null, null, -1, options);
-  }
-
-  /**
-   * sync set
-   * 
-   * @param setstat
-   *          : if node is created instead of set, stat will NOT be set
-   */
-  public boolean set(String path,
-                     T record,
-                     List<String> pathsCreated,
-                     Stat setstat,
-                     int expectVersion,
-                     int options)
-  {
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid set mode. options: " + options);
-      return false;
-    }
-
-    boolean retry;
-    do
-    {
-      retry = false;
-      try
-      {
-        // _zkClient.writeData(path, record);
-        Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
-        if (setstat != null)
-          DataTree.copyStat(setStat, setstat);
-      }
-      catch (ZkNoNodeException e)
-      {
-        // node not exists, try create. in this case, stat will not be set
-        try
-        {
-          RetCode rc = create(path, record, pathsCreated, options);
-          // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
-          // retry = true;
-          switch (rc)
-          {
-          case OK:
-            // not set stat if node is created (instead of set)
-            break;
-          case NODE_EXISTS:
-            retry = true;
-            break;
-          default:
-            LOG.error("Fail to set path by creating: " + path);
-            return false;
-          }
-        }
-        catch (Exception e1)
-        {
-          LOG.error("Exception while setting path by creating: " + path, e);
-          return false;
-        }
-      }
-      catch (ZkBadVersionException e)
-      {
-        throw e;
-      }
-      catch (Exception e)
-      {
-        LOG.error("Exception while setting path: " + path, e);
-        return false;
-      }
-    }
-    while (retry);
-
-    return true;
-  }
-
-  /**
-   * sync update
-   */
-  @Override
-  public boolean update(String path, DataUpdater<T> updater, int options)
-  {
-    return update(path, updater, null, null, options) != null;
-  }
-
-  /**
-   * sync update
-   * 
-   * @return: updatedData on success, or null on fail
-   */
-  public T update(String path,
-                  DataUpdater<T> updater,
-                  List<String> createPaths,
-                  Stat stat,
-                  int options)
-  {
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid update mode. options: " + options);
-      return null;
-    }
-
-    boolean retry;
-    T updatedData = null;
-    do
-    {
-      retry = false;
-      try
-      {
-        Stat readStat = new Stat();
-        T oldData = (T) _zkClient.readData(path, readStat);
-        T newData = updater.update(oldData);
-        Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
-        if (stat != null)
-        {
-          DataTree.copyStat(setStat, stat);
-        }
-
-        updatedData = newData;
-      }
-      catch (ZkBadVersionException e)
-      {
-        retry = true;
-      }
-      catch (ZkNoNodeException e)
-      {
-        // node not exist, try create
-        try
-        {
-          T newData = updater.update(null);
-          RetCode rc = create(path, newData, createPaths, options);
-          switch (rc)
-          {
-          case OK:
-            updatedData = newData;
-            break;
-          case NODE_EXISTS:
-            retry = true;
-            break;
-          default:
-            LOG.error("Fail to update path by creating: " + path);
-            return null;
-          }
-        }
-        catch (Exception e1)
-        {
-          LOG.error("Exception while updating path by creating: " + path, e1);
-          return null;
-        }
-      }
-      catch (Exception e)
-      {
-        LOG.error("Exception while updating path: " + path, e);
-        return null;
-      }
-    }
-    while (retry);
-
-    return updatedData;
-  }
-
-  /**
-   * sync get
-   * 
-   */
-  @Override
-  public T get(String path, Stat stat, int options)
-  {
-    T data = null;
-    try
-    {
-      data = (T) _zkClient.readData(path, stat);
-    }
-    catch (ZkNoNodeException e)
-    {
-      if (AccessOption.isThrowExceptionIfNotExist(options))
-      {
-        throw e;
-      }
-    }
-    return data;
-  }
-
-  /**
-   * async get
-   * 
-   */
-  @Override
-  public List<T> get(List<String> paths, List<Stat> stats, int options)
-  {
-    boolean[] needRead = new boolean[paths.size()];
-    Arrays.fill(needRead, true);
-
-    return get(paths, stats, needRead);
-  }
-
-  /**
-   * async get
-   */
-  List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
-  {
-    if (paths == null || paths.size() == 0)
-    {
-      return Collections.emptyList();
-    }
-
-    // init stats
-    if (stats != null)
-    {
-      stats.clear();
-      stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
-    }
-
-    long startT = System.nanoTime();
-
-    try
-    {
-      // issue asyn get requests
-      GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
-      for (int i = 0; i < paths.size(); i++)
-      {
-        if (!needRead[i])
-          continue;
-
-        String path = paths.get(i);
-        cbList[i] = new GetDataCallbackHandler();
-        _zkClient.asyncGetData(path, cbList[i]);
-      }
-
-      // wait for completion
-      for (int i = 0; i < cbList.length; i++)
-      {
-        if (!needRead[i])
-          continue;
-
-        GetDataCallbackHandler cb = cbList[i];
-        cb.waitForSuccess();
-      }
-
-      // construct return results
-      List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
-      for (int i = 0; i < paths.size(); i++)
-      {
-        if (!needRead[i])
-          continue;
-
-        GetDataCallbackHandler cb = cbList[i];
-        if (Code.get(cb.getRc()) == Code.OK)
-        {
-          @SuppressWarnings("unchecked")
-          T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
-          records.set(i, record);
-          if (stats != null)
-          {
-            stats.set(i, cb._stat);
-          }
-        }
-      }
-
-      return records;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-  }
-
-  /**
-   * asyn getChildren
-   * 
-   */
-  @Override
-  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
-  {
-    try
-    {
-      // prepare child paths
-      List<String> childNames = getChildNames(parentPath, options);
-      if (childNames == null || childNames.size() == 0)
-      {
-        return Collections.emptyList();
-      }
-
-      List<String> paths = new ArrayList<String>();
-      for (String childName : childNames)
-      {
-        String path = parentPath + "/" + childName;
-        paths.add(path);
-      }
-
-      // remove null record
-      List<Stat> curStats = new ArrayList<Stat>(paths.size());
-      List<T> records = get(paths, curStats, options);
-      Iterator<T> recordIter = records.iterator();
-      Iterator<Stat> statIter = curStats.iterator();
-      while (statIter.hasNext())
-      {
-        recordIter.next();
-        if (statIter.next() == null)
-        {
-          statIter.remove();
-          recordIter.remove();
-        }
-      }
-
-      if (stats != null)
-      {
-        stats.clear();
-        stats.addAll(curStats);
-      }
-
-      return records;
-    }
-    catch (ZkNoNodeException e)
-    {
-      return Collections.emptyList();
-    }
-  }
-
-  /**
-   * sync getChildNames
-   * 
-   * @return null if parentPath doesn't exist
-   */
-  @Override
-  public List<String> getChildNames(String parentPath, int options)
-  {
-    try
-    {
-      List<String> childNames = _zkClient.getChildren(parentPath);
-      Collections.sort(childNames);
-      return childNames;
-    }
-    catch (ZkNoNodeException e)
-    {
-      return null;
-    }
-  }
-
-  /**
-   * sync exists
-   * 
-   */
-  @Override
-  public boolean exists(String path, int options)
-  {
-    return _zkClient.exists(path);
-  }
-
-  /**
-   * sync getStat
-   * 
-   */
-  @Override
-  public Stat getStat(String path, int options)
-  {
-    return _zkClient.getStat(path);
-  }
-
-  /**
-   * sync remove
-   * 
-   */
-  @Override
-  public boolean remove(String path, int options)
-  {
-    try
-    {
-      // optimize on common path
-      _zkClient.delete(path);
-    }
-    catch (ZkException e)
-    {
-      _zkClient.deleteRecursive(path);
-    }
-    return true;
-  }
-
-  /**
-   * async create. give up on error other than NONODE
-   * 
-   */
-  CreateCallbackHandler[] create(List<String> paths,
-                                 List<T> records,
-                                 boolean[] needCreate,
-                                 List<List<String>> pathsCreated,
-                                 int options)
-  {
-    if ((records != null && records.size() != paths.size())
-        || needCreate.length != paths.size()
-        || (pathsCreated != null && pathsCreated.size() != paths.size()))
-    {
-      throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
-    }
-
-    CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
-
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid async set mode. options: " + options);
-      return cbList;
-    }
-
-    boolean retry;
-    do
-    {
-      retry = false;
-
-      for (int i = 0; i < paths.size(); i++)
-      {
-        if (!needCreate[i])
-          continue;
-
-        String path = paths.get(i);
-        T record = records == null ? null : records.get(i);
-        cbList[i] = new CreateCallbackHandler();
-        _zkClient.asyncCreate(path, record, mode, cbList[i]);
-      }
-
-      List<String> parentPaths =
-          new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
-      boolean failOnNoNode = false;
-
-      for (int i = 0; i < paths.size(); i++)
-      {
-        if (!needCreate[i])
-          continue;
-
-        CreateCallbackHandler cb = cbList[i];
-        cb.waitForSuccess();
-        String path = paths.get(i);
-
-        if (Code.get(cb.getRc()) == Code.NONODE)
-        {
-          String parentPath = new File(path).getParent();
-          parentPaths.set(i, parentPath);
-          failOnNoNode = true;
-        }
-        else
-        {
-          // if create succeed or fail on error other than NONODE,
-          // give up
-          needCreate[i] = false;
-
-          // if succeeds, record what paths we've created
-          if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
-          {
-            if (pathsCreated.get(i) == null)
-            {
-              pathsCreated.set(i, new ArrayList<String>());
-            }
-            pathsCreated.get(i).add(path);
-          }
-        }
-      }
-
-      if (failOnNoNode)
-      {
-        boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
-
-        CreateCallbackHandler[] parentCbList =
-            create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
-        for (int i = 0; i < parentCbList.length; i++)
-        {
-          CreateCallbackHandler parentCb = parentCbList[i];
-          if (parentCb == null)
-            continue;
-
-          Code rc = Code.get(parentCb.getRc());
-
-          // if parent is created, retry create child
-          if (rc == Code.OK || rc == Code.NODEEXISTS)
-          {
-            retry = true;
-            break;
-          }
-        }
-      }
-    }
-    while (retry);
-
-    return cbList;
-  }
-
-
-  /**
-   * async create
-   * 
-   * TODO: rename to create
-   */
-  @Override
-  public boolean[] createChildren(List<String> paths, List<T> records, int options)
-  {
-    boolean[] success = new boolean[paths.size()];
-
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid async create mode. options: " + options);
-      return success;
-    }
-
-    boolean[] needCreate = new boolean[paths.size()];
-    Arrays.fill(needCreate, true);
-    List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
-
-    long startT = System.nanoTime();
-    try
-    {
-
-      CreateCallbackHandler[] cbList =
-          create(paths, records, needCreate, pathsCreated, options);
-
-      for (int i = 0; i < cbList.length; i++)
-      {
-        CreateCallbackHandler cb = cbList[i];
-        success[i] = (Code.get(cb.getRc()) == Code.OK);
-      }
-
-      return success;
-
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-  }
-
-  /**
-   * async set
-   * 
-   * TODO: rename to set
-   * 
-   */
-  @Override
-  public boolean[] setChildren(List<String> paths, List<T> records, int options)
-  {
-    return set(paths, records, null, null, options);
-  }
-
-  /**
-   * async set, give up on error other than NoNode
-   * 
-   */
-  boolean[] set(List<String> paths,
-                List<T> records,
-                List<List<String>> pathsCreated,
-                List<Stat> stats,
-                int options)
-  {
-    if (paths == null || paths.size() == 0)
-    {
-      return new boolean[0];
-    }
-
-    if ((records != null && records.size() != paths.size())
-        || (pathsCreated != null && pathsCreated.size() != paths.size()))
-    {
-      throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
-    }
-
-    boolean[] success = new boolean[paths.size()];
-
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid async set mode. options: " + options);
-      return success;
-    }
-
-    List<Stat> setStats =
-        new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
-    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
-    CreateCallbackHandler[] createCbList = null;
-    boolean[] needSet = new boolean[paths.size()];
-    Arrays.fill(needSet, true);
-
-    long startT = System.nanoTime();
-
-    try
-    {
-      boolean retry;
-      do
-      {
-        retry = false;
-
-        for (int i = 0; i < paths.size(); i++)
-        {
-          if (!needSet[i])
-            continue;
-
-          String path = paths.get(i);
-          T record = records.get(i);
-          cbList[i] = new SetDataCallbackHandler();
-          _zkClient.asyncSetData(path, record, -1, cbList[i]);
-
-        }
-
-        boolean failOnNoNode = false;
-
-        for (int i = 0; i < cbList.length; i++)
-        {
-          SetDataCallbackHandler cb = cbList[i];
-          cb.waitForSuccess();
-          Code rc = Code.get(cb.getRc());
-          switch (rc)
-          {
-          case OK:
-            setStats.set(i, cb.getStat());
-            needSet[i] = false;
-            break;
-          case NONODE:
-            // if fail on NoNode, try create the node
-            failOnNoNode = true;
-            break;
-          default:
-            // if fail on error other than NoNode, give up
-            needSet[i] = false;
-            break;
-          }
-        }
-
-        // if failOnNoNode, try create
-        if (failOnNoNode)
-        {
-          boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
-          createCbList = create(paths, records, needCreate, pathsCreated, options);
-          for (int i = 0; i < createCbList.length; i++)
-          {
-            CreateCallbackHandler createCb = createCbList[i];
-            if (createCb == null)
-            {
-              continue;
-            }
-
-            Code rc = Code.get(createCb.getRc());
-            switch (rc)
-            {
-            case OK:
-              setStats.set(i, ZNode.ZERO_STAT);
-              needSet[i] = false;
-              break;
-            case NODEEXISTS:
-              retry = true;
-              break;
-            default:
-              // if creation fails on error other than NodeExists
-              // no need to retry set
-              needSet[i] = false;
-              break;
-            }
-          }
-        }
-      }
-      while (retry);
-
-      // construct return results
-      for (int i = 0; i < cbList.length; i++)
-      {
-        SetDataCallbackHandler cb = cbList[i];
-
-        Code rc = Code.get(cb.getRc());
-        if (rc == Code.OK)
-        {
-          success[i] = true;
-        }
-        else if (rc == Code.NONODE)
-        {
-          CreateCallbackHandler createCb = createCbList[i];
-          if (Code.get(createCb.getRc()) == Code.OK)
-          {
-            success[i] = true;
-          }
-        }
-      }
-
-      if (stats != null)
-      {
-        stats.clear();
-        stats.addAll(setStats);
-      }
-
-      return success;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-  }
-
-  // TODO: rename to update
-  /**
-   * async update
-   */
-  @Override
-  public boolean[] updateChildren(List<String> paths,
-                                  List<DataUpdater<T>> updaters,
-                                  int options)
-  {
-
-    List<T> updateData = update(paths, updaters, null, null, options);
-    boolean[] success = new boolean[paths.size()]; // init to false
-    for (int i = 0; i < paths.size(); i++)
-    {
-      T data = updateData.get(i);
-      success[i] = (data != null);
-    }
-    return success;
-  }
-
-  /**
-   * async update
-   * 
-   * return: updatedData on success or null on fail
-   */
-  List<T> update(List<String> paths,
-                 List<DataUpdater<T>> updaters,
-                 List<List<String>> pathsCreated,
-                 List<Stat> stats,
-                 int options)
-  {
-    if (paths == null || paths.size() == 0)
-    {
-      LOG.error("paths is null or empty");
-      return Collections.emptyList();
-    }
-
-    if (updaters.size() != paths.size()
-        || (pathsCreated != null && pathsCreated.size() != paths.size()))
-    {
-      throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
-    }
-
-    List<Stat> setStats =
-        new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
-    List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null)
-    {
-      LOG.error("Invalid update mode. options: " + options);
-      return updateData;
-    }
-
-    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
-    CreateCallbackHandler[] createCbList = null;
-    boolean[] needUpdate = new boolean[paths.size()];
-    Arrays.fill(needUpdate, true);
-
-    long startT = System.nanoTime();
-
-    try
-    {
-      boolean retry;
-      do
-      {
-        retry = false;
-        boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
-        boolean failOnNoNode = false;
-
-        // asycn read all data
-        List<Stat> curStats = new ArrayList<Stat>();
-        List<T> curDataList =
-            get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
-
-        // async update
-        List<T> newDataList = new ArrayList<T>();
-        for (int i = 0; i < paths.size(); i++)
-        {
-          if (!needUpdate[i])
-          {
-            newDataList.add(null);
-            continue;
-          }
-          String path = paths.get(i);
-          DataUpdater<T> updater = updaters.get(i);
-          T newData = updater.update(curDataList.get(i));
-          newDataList.add(newData);
-          Stat curStat = curStats.get(i);
-          if (curStat == null)
-          {
-            // node not exists
-            failOnNoNode = true;
-            needCreate[i] = true;
-          }
-          else
-          {
-            cbList[i] = new SetDataCallbackHandler();
-            _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
-          }
-        }
-
-        // wait for completion
-        boolean failOnBadVersion = false;
-
-        for (int i = 0; i < paths.size(); i++)
-        {
-          SetDataCallbackHandler cb = cbList[i];
-          if (cb == null)
-            continue;
-
-          cb.waitForSuccess();
-
-          switch (Code.get(cb.getRc()))
-          {
-          case OK:
-            updateData.set(i, newDataList.get(i));
-            setStats.set(i, cb.getStat());
-            needUpdate[i] = false;
-            break;
-          case NONODE:
-            failOnNoNode = true;
-            needCreate[i] = true;
-            break;
-          case BADVERSION:
-            failOnBadVersion = true;
-            break;
-          default:
-            // if fail on error other than NoNode or BadVersion
-            // will not retry
-            needUpdate[i] = false;
-            break;
-          }
-        }
-
-        // if failOnNoNode, try create
-        if (failOnNoNode)
-        {
-          createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
-          for (int i = 0; i < paths.size(); i++)
-          {
-            CreateCallbackHandler createCb = createCbList[i];
-            if (createCb == null)
-            {
-              continue;
-            }
-
-            switch (Code.get(createCb.getRc()))
-            {
-            case OK:
-              needUpdate[i] = false;
-              updateData.set(i, newDataList.get(i));
-              setStats.set(i, ZNode.ZERO_STAT);
-              break;
-            case NODEEXISTS:
-              retry = true;
-              break;
-            default:
-              // if fail on error other than NodeExists
-              // will not retry
-              needUpdate[i] = false;
-              break;
-            }
-          }
-        }
-
-        // if failOnBadVersion, retry
-        if (failOnBadVersion)
-        {
-          retry = true;
-        }
-      }
-      while (retry);
-
-      if (stats != null)
-      {
-        stats.clear();
-        stats.addAll(setStats);
-      }
-
-      return updateData;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-
-  }
-
-  /**
-   * async exists
-   * 
-   */
-  @Override
-  public boolean[] exists(List<String> paths, int options)
-  {
-    Stat[] stats = getStats(paths, options);
-
-    boolean[] exists = new boolean[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      exists[i] = (stats[i] != null);
-    }
-
-    return exists;
-  }
-
-  /**
-   * async getStat
-   * 
-   */
-  @Override
-  public Stat[] getStats(List<String> paths, int options)
-  {
-    if (paths == null || paths.size() == 0)
-    {
-      LOG.error("paths is null or empty");
-      return new Stat[0];
-    }
-
-    Stat[] stats = new Stat[paths.size()];
-
-    long startT = System.nanoTime();
-
-    try
-    {
-      ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
-      for (int i = 0; i < paths.size(); i++)
-      {
-        String path = paths.get(i);
-        cbList[i] = new ExistsCallbackHandler();
-        _zkClient.asyncExists(path, cbList[i]);
-      }
-
-      for (int i = 0; i < cbList.length; i++)
-      {
-        ExistsCallbackHandler cb = cbList[i];
-        cb.waitForSuccess();
-        stats[i] = cb._stat;
-      }
-
-      return stats;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-  }
-
-  /**
-   * async remove
-   * 
-   */
-  @Override
-  public boolean[] remove(List<String> paths, int options)
-  {
-    if (paths == null || paths.size() == 0)
-    {
-      return new boolean[0];
-    }
-
-    boolean[] success = new boolean[paths.size()];
-
-    DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
-
-    long startT = System.nanoTime();
-
-    try
-    {
-      for (int i = 0; i < paths.size(); i++)
-      {
-        String path = paths.get(i);
-        cbList[i] = new DeleteCallbackHandler();
-        _zkClient.asyncDelete(path, cbList[i]);
-      }
-
-      for (int i = 0; i < cbList.length; i++)
-      {
-        DeleteCallbackHandler cb = cbList[i];
-        cb.waitForSuccess();
-        success[i] = (cb.getRc() == 0);
-      }
-
-      return success;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
-          + ",... time: " + (endT - startT) + " ns");
-    }
-  }
-
-  /**
-   * Subscribe to zookeeper data changes
-   */
-  @Override
-  public void subscribeDataChanges(String path, IZkDataListener listener)
-  {
-    _zkClient.subscribeDataChanges(path, listener);
-  }
-
-  /**
-   * Unsubscribe to zookeeper data changes
-   */
-  @Override
-  public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
-  {
-    _zkClient.unsubscribeDataChanges(path, dataListener);
-  }
-
-  /**
-   * Subscrie to zookeeper data changes
-   */
-  @Override
-  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
-  {
-    return _zkClient.subscribeChildChanges(path, listener);
-  }
-
-  /**
-   * Unsubscrie to zookeeper data changes
-   */
-  @Override
-  public void unsubscribeChildChanges(String path, IZkChildListener childListener)
-  {
-    _zkClient.unsubscribeChildChanges(path, childListener);
-  }
-
-  // simple test
-  public static void main(String[] args)
-  {
-    ZkClient zkclient = new ZkClient("localhost:2191");
-    zkclient.setZkSerializer(new ZNRecordSerializer());
-    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
-    // test async create
-    List<String> createPaths =
-        Arrays.asList("/test/child1/child1", "/test/child2/child2");
-    List<ZNRecord> createRecords =
-        Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
-
-    boolean[] needCreate = new boolean[createPaths.size()];
-    Arrays.fill(needCreate, true);
-    List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
-                                                                       null));
-    accessor.create(createPaths,
-                    createRecords,
-                    needCreate,
-                    pathsCreated,
-                    AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-
-    // test async set
-    List<String> setPaths =
-        Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
-    List<ZNRecord> setRecords =
-        Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
-                                                                       null));
-    boolean[] success =
-        accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("setSuccess: " + Arrays.toString(success));
-
-    // test async update
-    List<String> updatePaths =
-        Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
-    class TestUpdater implements DataUpdater<ZNRecord>
-    {
-      final ZNRecord _newData;
-
-      public TestUpdater(ZNRecord newData)
-      {
-        _newData = newData;
-      }
-
-      @Override
-      public ZNRecord update(ZNRecord currentData)
-      {
-        return _newData;
-
-      }
-    }
-    List<DataUpdater<ZNRecord>> updaters =
-        Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
-                      (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
-                                                                       null));
-
-    List<ZNRecord> updateRecords =
-        accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
-    for (int i = 0; i < updatePaths.size(); i++)
-    {
-      success[i] = updateRecords.get(i) != null;
-    }
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("updateSuccess: " + Arrays.toString(success));
-
-    System.out.println("CLOSING");
-    zkclient.close();
-  }
-
-  /**
-   * Reset
-   */
-  @Override
-  public void reset()
-  {
-    // Nothing to do
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
deleted file mode 100644
index 41f8bab..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ /dev/null
@@ -1,984 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataTree;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor.RetCode;
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.HelixPropertyStore;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
-{
-  private static final Logger    LOG          =
-                                                  Logger.getLogger(ZkCacheBaseDataAccessor.class);
-
-  protected WriteThroughCache<T> _wtCache;
-  protected ZkCallbackCache<T>   _zkCache;
-
-  final ZkBaseDataAccessor<T>    _baseAccessor;
-  final Map<String, Cache<T>>    _cacheMap;
-
-  final String                   _chrootPath;
-  final List<String>             _wtCachePaths;
-  final List<String>             _zkCachePaths;
-
-  final HelixGroupCommit<T>      _groupCommit = new HelixGroupCommit<T>();
-
-  // fire listeners
-  private final ReentrantLock    _eventLock   = new ReentrantLock();
-  private ZkCacheEventThread     _eventThread;
-
-  private ZkClient               _zkclient    = null;
-
-  public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
-                                 List<String> wtCachePaths)
-  {
-    this(baseAccessor, null, wtCachePaths, null);
-  }
-
-  public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
-                                 String chrootPath,
-                                 List<String> wtCachePaths,
-                                 List<String> zkCachePaths)
-  {
-    _baseAccessor = baseAccessor;
-
-    if (chrootPath == null || chrootPath.equals("/"))
-    {
-      _chrootPath = null;
-    }
-    else
-    {
-      PathUtils.validatePath(chrootPath);
-      _chrootPath = chrootPath;
-    }
-
-    _wtCachePaths = wtCachePaths;
-    _zkCachePaths = zkCachePaths;
-
-    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
-    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
-    // comes first
-    _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
-    {
-      @Override
-      public int compare(String o1, String o2)
-      {
-        int len1 = o1.split("/").length;
-        int len2 = o2.split("/").length;
-        return len1 - len2;
-      }
-    });
-
-    start();
-  }
-
-  public ZkCacheBaseDataAccessor(String zkAddress,
-                                 ZkSerializer serializer,
-                                 String chrootPath,
-                                 List<String> wtCachePaths,
-                                 List<String> zkCachePaths)
-  {
-    _zkclient =
-        new ZkClient(zkAddress,
-                     ZkClient.DEFAULT_SESSION_TIMEOUT,
-                     ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-                     serializer);
-    _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-                                 TimeUnit.MILLISECONDS);
-    _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
-
-    if (chrootPath == null || chrootPath.equals("/"))
-    {
-      _chrootPath = null;
-    }
-    else
-    {
-      PathUtils.validatePath(chrootPath);
-      _chrootPath = chrootPath;
-    }
-
-    _wtCachePaths = wtCachePaths;
-    _zkCachePaths = zkCachePaths;
-
-    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
-    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
-    // comes first
-    _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
-    {
-      @Override
-      public int compare(String o1, String o2)
-      {
-        int len1 = o1.split("/").length;
-        int len2 = o2.split("/").length;
-        return len1 - len2;
-      }
-    });
-
-    start();
-  }
-
-  private String prependChroot(String clientPath)
-  {
-    if (_chrootPath != null)
-    {
-      // handle clientPath = "/"
-      if (clientPath.length() == 1)
-      {
-        return _chrootPath;
-      }
-      return _chrootPath + clientPath;
-    }
-    else
-    {
-      return clientPath;
-    }
-  }
-
-  private List<String> prependChroot(List<String> clientPaths)
-  {
-    List<String> serverPaths = new ArrayList<String>();
-    for (String clientPath : clientPaths)
-    {
-      serverPaths.add(prependChroot(clientPath));
-    }
-    return serverPaths;
-  }
-
-  /**
-   * find the first path in paths that is a descendant
-   */
-  private String firstCachePath(List<String> paths)
-  {
-    for (String cachePath : _cacheMap.keySet())
-    {
-      for (String path : paths)
-      {
-        if (path.startsWith(cachePath))
-        {
-          return path;
-        }
-      }
-    }
-    return null;
-  }
-
-  private Cache<T> getCache(String path)
-  {
-    for (String cachePath : _cacheMap.keySet())
-    {
-      if (path.startsWith(cachePath))
-      {
-        return _cacheMap.get(cachePath);
-      }
-    }
-
-    return null;
-  }
-
-  private Cache<T> getCache(List<String> paths)
-  {
-    Cache<T> cache = null;
-    for (String path : paths)
-    {
-      for (String cachePath : _cacheMap.keySet())
-      {
-        if (cache == null && path.startsWith(cachePath))
-        {
-          cache = _cacheMap.get(cachePath);
-        }
-        else if (cache != null && cache != _cacheMap.get(cachePath))
-        {
-          throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
-              + paths);
-        }
-      }
-    }
-
-    return cache;
-  }
-
-  private void updateCache(Cache<T> cache,
-                           List<String> createPaths,
-                           boolean success,
-                           String updatePath,
-                           T data,
-                           Stat stat)
-  {
-    if (createPaths == null || createPaths.isEmpty())
-    {
-      if (success)
-      {
-        cache.update(updatePath, data, stat);
-      }
-    }
-    else
-    {
-      String firstPath = firstCachePath(createPaths);
-      if (firstPath != null)
-      {
-        cache.updateRecursive(firstPath);
-      }
-    }
-  }
-
-  @Override
-  public boolean create(String path, T data, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-        List<String> pathsCreated = new ArrayList<String>();
-        RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
-        boolean success = (rc == RetCode.OK);
-
-        updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.create(serverPath, data, options);
-  }
-
-  @Override
-  public boolean set(String path, T data, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-        Stat setStat = new Stat();
-        List<String> pathsCreated = new ArrayList<String>();
-        boolean success =
-            _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
-
-        updateCache(cache, pathsCreated, success, serverPath, data, setStat);
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.set(serverPath, data, options);
-  }
-
-  @Override
-  public boolean update(String path, DataUpdater<T> updater, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-        Stat setStat = new Stat();
-        List<String> pathsCreated = new ArrayList<String>();
-        T updateData =
-            _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
-        boolean success = (updateData != null);
-        updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
-    // return _baseAccessor.update(serverPath, updater, options);
-  }
-
-  @Override
-  public boolean exists(String path, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      boolean exists = cache.exists(serverPath);
-      if (exists)
-      {
-        return true;
-      }
-    }
-
-    // if not exists in cache, always fall back to zk
-    return _baseAccessor.exists(serverPath, options);
-  }
-
-  @Override
-  public boolean remove(String path, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-
-        boolean success = _baseAccessor.remove(serverPath, options);
-        if (success)
-        {
-          cache.purgeRecursive(serverPath);
-        }
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.remove(serverPath, options);
-  }
-
-  @Override
-  public T get(String path, Stat stat, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      T record = null;
-      ZNode znode = cache.get(serverPath);
-
-      if (znode != null)
-      {
-        // TODO: shall return a deep copy instead of reference
-        record = ((T) znode.getData());
-        if (stat != null)
-        {
-          DataTree.copyStat(znode.getStat(), stat);
-        }
-        return record;
-
-      }
-      else
-      {
-        // if cache miss, fall back to zk and update cache
-        try
-        {
-          cache.lockWrite();
-          record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-          cache.update(serverPath, record, stat);
-        }
-        catch (ZkNoNodeException e)
-        {
-          if (AccessOption.isThrowExceptionIfNotExist(options))
-          {
-            throw e;
-          }
-        }
-        finally
-        {
-          cache.unlockWrite();
-        }
-
-        return record;
-      }
-    }
-
-    // no cache
-    return _baseAccessor.get(serverPath, stat, options);
-  }
-
-  @Override
-  public Stat getStat(String path, int options)
-  {
-    String clientPath = path;
-    String serverPath = prependChroot(clientPath);
-
-    Cache<T> cache = getCache(serverPath);
-    if (cache != null)
-    {
-      Stat stat = new Stat();
-      ZNode znode = cache.get(serverPath);
-
-      if (znode != null)
-      {
-        return znode.getStat();
-
-      }
-      else
-      {
-        // if cache miss, fall back to zk and update cache
-        try
-        {
-          cache.lockWrite();
-          T data = _baseAccessor.get(serverPath, stat, options);
-          cache.update(serverPath, data, stat);
-        }
-        catch (ZkNoNodeException e)
-        {
-          return null;
-        }
-        finally
-        {
-          cache.unlockWrite();
-        }
-
-        return stat;
-      }
-    }
-
-    // no cache
-    return _baseAccessor.getStat(serverPath, options);
-  }
-
-  @Override
-  public boolean[] createChildren(List<String> paths, List<T> records, int options)
-  {
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    Cache<T> cache = getCache(serverPaths);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-        boolean[] needCreate = new boolean[size];
-        Arrays.fill(needCreate, true);
-        List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        CreateCallbackHandler[] createCbList =
-            _baseAccessor.create(serverPaths,
-                                 records,
-                                 needCreate,
-                                 pathsCreatedList,
-                                 options);
-
-        boolean[] success = new boolean[size];
-        for (int i = 0; i < size; i++)
-        {
-          CreateCallbackHandler cb = createCbList[i];
-          success[i] = (Code.get(cb.getRc()) == Code.OK);
-
-          updateCache(cache,
-                      pathsCreatedList.get(i),
-                      success[i],
-                      serverPaths.get(i),
-                      records.get(i),
-                      ZNode.ZERO_STAT);
-        }
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.createChildren(serverPaths, records, options);
-  }
-
-  @Override
-  public boolean[] setChildren(List<String> paths, List<T> records, int options)
-  {
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    Cache<T> cache = getCache(serverPaths);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-        List<Stat> setStats = new ArrayList<Stat>();
-        List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        boolean[] success =
-            _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
-
-        for (int i = 0; i < size; i++)
-        {
-          updateCache(cache,
-                      pathsCreatedList.get(i),
-                      success[i],
-                      serverPaths.get(i),
-                      records.get(i),
-                      setStats.get(i));
-        }
-
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    return _baseAccessor.setChildren(serverPaths, records, options);
-  }
-
-  @Override
-  public boolean[] updateChildren(List<String> paths,
-                                  List<DataUpdater<T>> updaters,
-                                  int options)
-  {
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    Cache<T> cache = getCache(serverPaths);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-
-        List<Stat> setStats = new ArrayList<Stat>();
-        boolean[] success = new boolean[size];
-        List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        List<T> updateData =
-            _baseAccessor.update(serverPaths,
-                                 updaters,
-                                 pathsCreatedList,
-                                 setStats,
-                                 options);
-
-        // System.out.println("updateChild: ");
-        // for (T data : updateData)
-        // {
-        // System.out.println(data);
-        // }
-
-        for (int i = 0; i < size; i++)
-        {
-          success[i] = (updateData.get(i) != null);
-          updateCache(cache,
-                      pathsCreatedList.get(i),
-                      success[i],
-                      serverPaths.get(i),
-                      updateData.get(i),
-                      setStats.get(i));
-        }
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.updateChildren(serverPaths, updaters, options);
-  }
-
-  // TODO: change to use async_exists
-  @Override
-  public boolean[] exists(List<String> paths, int options)
-  {
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    boolean exists[] = new boolean[size];
-    for (int i = 0; i < size; i++)
-    {
-      exists[i] = exists(serverPaths.get(i), options);
-    }
-    return exists;
-  }
-
-  @Override
-  public boolean[] remove(List<String> paths, int options)
-  {
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    Cache<T> cache = getCache(serverPaths);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockWrite();
-
-        boolean[] success = _baseAccessor.remove(serverPaths, options);
-
-        for (int i = 0; i < size; i++)
-        {
-          if (success[i])
-          {
-            cache.purgeRecursive(serverPaths.get(i));
-          }
-        }
-        return success;
-      }
-      finally
-      {
-        cache.unlockWrite();
-      }
-    }
-
-    // no cache
-    return _baseAccessor.remove(serverPaths, options);
-  }
-
-  @Override
-  public List<T> get(List<String> paths, List<Stat> stats, int options)
-  {
-    if (paths == null || paths.isEmpty())
-    {
-      return Collections.emptyList();
-    }
-
-    final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
-
-    List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
-    List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
-
-    boolean needRead = false;
-    boolean needReads[] = new boolean[size]; // init to false
-
-    Cache<T> cache = getCache(serverPaths);
-    if (cache != null)
-    {
-      try
-      {
-        cache.lockRead();
-        for (int i = 0; i < size; i++)
-        {
-          ZNode zNode = cache.get(serverPaths.get(i));
-          if (zNode != null)
-          {
-            // TODO: shall return a deep copy instead of reference
-            records.set(i, (T) zNode.getData());
-            readStats.set(i, zNode.getStat());
-          }
-          else
-          {
-            needRead = true;
-            needReads[i] = true;
-          }
-        }
-      }
-      finally
-      {
-        cache.unlockRead();
-      }
-
-      // cache miss, fall back to zk and update cache
-      if (needRead)
-      {
-        cache.lockWrite();
-        try
-        {
-          List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
-          for (int i = 0; i < size; i++)
-          {
-            if (needReads[i])
-            {
-              records.set(i, readRecords.get(i));
-              cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
-            }
-          }
-        }
-        finally
-        {
-          cache.unlockWrite();
-        }
-      }
-
-      if (stats != null)
-      {
-        stats.clear();
-        stats.addAll(readStats);
-      }
-
-      return records;
-    }
-
-    // no cache
-    return _baseAccessor.get(serverPaths, stats, options);
-  }
-
-  // TODO: add cache
-  @Override
-  public Stat[] getStats(List<String> paths, int options)
-  {
-    List<String> serverPaths = prependChroot(paths);
-    return _baseAccessor.getStats(serverPaths, options);
-  }
-
-  @Override
-  public List<String> getChildNames(String parentPath, int options)
-  {
-    String serverParentPath = prependChroot(parentPath);
-
-    Cache<T> cache = getCache(serverParentPath);
-    if (cache != null)
-    {
-      // System.out.println("zk-cache");
-      ZNode znode = cache.get(serverParentPath);
-
-      if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
-      {
-        // System.out.println("zk-cache-hit: " + parentPath);
-        List<String> childNames = new ArrayList<String>(znode.getChildSet());
-        Collections.sort(childNames);
-        return childNames;
-      }
-      else
-      {
-        // System.out.println("zk-cache-miss");
-        try
-        {
-          cache.lockWrite();
-
-          List<String> childNames =
-              _baseAccessor.getChildNames(serverParentPath, options);
-          // System.out.println("\t--" + childNames);
-          cache.addToParentChildSet(serverParentPath, childNames);
-
-          return childNames;
-        }
-        finally
-        {
-          cache.unlockWrite();
-        }
-      }
-    }
-
-    // no cache
-    return _baseAccessor.getChildNames(serverParentPath, options);
-  }
-
-  @Override
-  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
-  {
-    List<String> childNames = getChildNames(parentPath, options);
-    if (childNames == null)
-    {
-      return null;
-    }
-
-    List<String> paths = new ArrayList<String>();
-    for (String childName : childNames)
-    {
-      String path = parentPath + "/" + childName;
-      paths.add(path);
-    }
-
-    return get(paths, stats, options);
-  }
-
-  @Override
-  public void subscribeDataChanges(String path, IZkDataListener listener)
-  {
-    String serverPath = prependChroot(path);
-
-    _baseAccessor.subscribeDataChanges(serverPath, listener);
-  }
-
-  @Override
-  public void unsubscribeDataChanges(String path, IZkDataListener listener)
-  {
-    String serverPath = prependChroot(path);
-
-    _baseAccessor.unsubscribeDataChanges(serverPath, listener);
-  }
-
-  @Override
-  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
-  {
-    String serverPath = prependChroot(path);
-
-    return _baseAccessor.subscribeChildChanges(serverPath, listener);
-  }
-
-  @Override
-  public void unsubscribeChildChanges(String path, IZkChildListener listener)
-  {
-    String serverPath = prependChroot(path);
-
-    _baseAccessor.unsubscribeChildChanges(serverPath, listener);
-  }
-
-  @Override
-  public void subscribe(String parentPath, HelixPropertyListener listener)
-  {
-    String serverPath = prependChroot(parentPath);
-    _zkCache.subscribe(serverPath, listener);
-  }
-
-  @Override
-  public void unsubscribe(String parentPath, HelixPropertyListener listener)
-  {
-    String serverPath = prependChroot(parentPath);
-    _zkCache.unsubscribe(serverPath, listener);
-  }
-
-  @Override
-  public void start()
-  {
-
-    LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
-        + ", " + _zkCachePaths);
-
-    // start event thread
-    try
-    {
-      _eventLock.lockInterruptibly();
-      if (_eventThread != null)
-      {
-        LOG.warn(_eventThread + " has already started");
-      }
-      else
-      {
-
-        if (_zkCachePaths == null || _zkCachePaths.isEmpty())
-        {
-          LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
-        }
-        else
-        {
-          LOG.debug("Starting ZkCacheEventThread...");
-
-          _eventThread = new ZkCacheEventThread("");
-          _eventThread.start();
-        }
-      }
-    }
-    catch (InterruptedException e)
-    {
-      LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
-    }
-    finally
-    {
-      _eventLock.unlock();
-    }
-    LOG.debug("Start ZkCacheEventThread...done");
-
-    _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
-    _zkCache =
-        new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
-
-    if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
-    {
-      for (String path : _wtCachePaths)
-      {
-        _cacheMap.put(path, _wtCache);
-      }
-    }
-
-    if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
-    {
-      for (String path : _zkCachePaths)
-      {
-        _cacheMap.put(path, _zkCache);
-      }
-    }
-  }
-
-  @Override
-  public void stop()
-  {
-    try
-    {
-      _eventLock.lockInterruptibly();
-
-      if (_zkclient != null)
-      {
-        _zkclient.close();
-        _zkclient = null;
-      }
-
-      if (_eventThread == null)
-      {
-        LOG.warn(_eventThread + " has already stopped");
-        return;
-      }
-
-      LOG.debug("Stopping ZkCacheEventThread...");
-      _eventThread.interrupt();
-      _eventThread.join(2000);
-      _eventThread = null;
-    }
-    catch (InterruptedException e)
-    {
-      LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
-    }
-    finally
-    {
-      _eventLock.unlock();
-    }
-
-    LOG.debug("Stop ZkCacheEventThread...done");
-
-  }
-  
-  @Override
-  public void reset()
-  {
-    if (_wtCache != null)
-    {
-      _wtCache.reset();
-    }
-    
-    if (_zkCache != null)
-    {
-      _zkCache.reset();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
deleted file mode 100644
index 4d3dc21..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.log4j.Logger;
-
-// copy from ZkEventThread
-public class ZkCacheEventThread extends Thread
-{
-
-  private static final Logger          LOG      =
-                                                    Logger.getLogger(ZkCacheEventThread.class);
-  private final BlockingQueue<ZkCacheEvent> _events  = new LinkedBlockingQueue<ZkCacheEvent>();
-  private static AtomicInteger         _eventId = new AtomicInteger(0);
-
-  static abstract class ZkCacheEvent
-  {
-
-    private final String _description;
-
-    public ZkCacheEvent(String description)
-    {
-      _description = description;
-    }
-
-    public abstract void run() throws Exception;
-
-    @Override
-    public String toString()
-    {
-      return "ZkCacheEvent[" + _description + "]";
-    }
-  }
-
-  ZkCacheEventThread(String name)
-  {
-    setDaemon(true);
-    setName("ZkCache-EventThread-" + getId() + "-" + name);
-  }
-
-  @Override
-  public void run()
-  {
-    LOG.info("Starting ZkCache event thread.");
-    try
-    {
-      while (!isInterrupted())
-      {
-        ZkCacheEvent zkEvent = _events.take();
-        int eventId = _eventId.incrementAndGet();
-        LOG.debug("Delivering event #" + eventId + " " + zkEvent);
-        try
-        {
-          zkEvent.run();
-        }
-        catch (InterruptedException e)
-        {
-          interrupt();
-        }
-        catch (ZkInterruptedException e)
-        {
-          interrupt();
-        }
-        catch (Throwable e)
-        {
-          LOG.error("Error handling event " + zkEvent, e);
-        }
-        LOG.debug("Delivering event #" + eventId + " done");
-      }
-    }
-    catch (InterruptedException e)
-    {
-      LOG.info("Terminate ZkClient event thread.");
-    }
-  }
-
-  public void send(ZkCacheEvent event)
-  {
-    if (!isInterrupted())
-    {
-      LOG.debug("New event: " + event);
-      _events.add(event);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
deleted file mode 100644
index aa3c559..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
+++ /dev/null
@@ -1,348 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkCallbackCache<T> extends Cache<T> implements
-    IZkChildListener,
-    IZkDataListener,
-    IZkStateListener
-{
-  private static Logger LOG = Logger.getLogger(ZkCallbackCache.class);
-
-  final BaseDataAccessor<T> _accessor;
-  final String _chrootPath;
-
-  private final ZkCacheEventThread _eventThread;
-  private final Map<String, Set<HelixPropertyListener>> _listener;
-
-  public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath,
-                         List<String> paths, ZkCacheEventThread eventThread)
-  {
-    super();
-    _accessor = accessor;
-    _chrootPath = chrootPath;
-
-    _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
-    _eventThread = eventThread;
-
-    // init cache
-    // System.out.println("init cache: " + paths);
-    if (paths != null && !paths.isEmpty())
-    {
-      for (String path : paths)
-      {
-        updateRecursive(path);
-      }
-    }
-  }
-
-  @Override
-  public void update(String path, T data, Stat stat)
-  {
-    String parentPath = new File(path).getParent();
-    String childName = new File(path).getName();
-
-    addToParentChildSet(parentPath, childName);
-    ZNode znode = _cache.get(path);
-    if (znode == null)
-    {
-      _cache.put(path, new ZNode(path, data, stat));
-      fireEvents(path, EventType.NodeCreated);
-    }
-    else
-    {
-      Stat oldStat = znode.getStat();
-
-      znode.setData(data);
-      znode.setStat(stat);
-      // System.out.println("\t\t--setData. path: " + path + ", data: " + data);
-
-      if (oldStat.getCzxid() != stat.getCzxid())
-      {
-        fireEvents(path, EventType.NodeDeleted);
-        fireEvents(path, EventType.NodeCreated);
-      }
-      else if (oldStat.getVersion() != stat.getVersion())
-      {
-        // System.out.println("\t--fireNodeChanged: " + path + ", oldVersion: " +
-        // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
-        fireEvents(path, EventType.NodeDataChanged);
-      }
-    }
-  }
-
-  // TODO: make readData async
-  @Override
-  public void updateRecursive(String path)
-  {
-    if (path == null)
-    {
-      return;
-    }
-
-    try
-    {
-      _lock.writeLock().lock();
-      try
-      {
-        // subscribe changes before read
-        _accessor.subscribeDataChanges(path, this);
-
-        // update this node
-        Stat stat = new Stat();
-        T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
-        update(path, readData, stat);
-      }
-      catch (ZkNoNodeException e)
-      {
-        // OK. znode not exists
-        // we still need to subscribe child change
-      }
-
-      // recursively update children nodes if not exists
-      // System.out.println("subcribeChildChange: " + path);
-      ZNode znode = _cache.get(path);
-      List<String> childNames = _accessor.subscribeChildChanges(path, this);
-      if (childNames != null && !childNames.isEmpty())
-      {
-        for (String childName : childNames)
-        {
-          if (!znode.hasChild(childName))
-          {
-            String childPath = path + "/" + childName;
-            znode.addChild(childName);
-            updateRecursive(childPath);
-          }
-        }
-      }
-    }
-    finally
-    {
-      _lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
-  {
-    // System.out.println("handleChildChange: " + parentPath + ", " + currentChilds);
-
-    // this is invoked if subscribed for childChange and node gets deleted
-    if (currentChilds == null)
-    {
-      return;
-    }
-
-    updateRecursive(parentPath);
-  }
-
-  @Override
-  public void handleDataChange(String dataPath, Object data) throws Exception
-  {
-    // System.out.println("handleDataChange: " + dataPath);
-    try
-    {
-      _lock.writeLock().lock();
-
-      // TODO: optimize it by get stat from callback
-      Stat stat = new Stat();
-      Object readData =
-          _accessor.get(dataPath, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
-      ZNode znode = _cache.get(dataPath);
-      if (znode != null)
-      {
-        Stat oldStat = znode.getStat();
-
-        // System.out.println("handleDataChange: " + dataPath + ", data: " + data);
-        // System.out.println("handleDataChange: " + dataPath + ", oldCzxid: " +
-        // oldStat.getCzxid() + ", newCzxid: " + stat.getCzxid()
-        // + ", oldVersion: " + oldStat.getVersion() + ", newVersion: " +
-        // stat.getVersion());
-        znode.setData(readData);
-        znode.setStat(stat);
-
-        // if create right after delete, and zkCallback comes after create
-        // no DataDelete() will be fired, instead will fire 2 DataChange()
-        // see ZkClient.fireDataChangedEvents()
-        if (oldStat.getCzxid() != stat.getCzxid())
-        {
-          fireEvents(dataPath, EventType.NodeDeleted);
-          fireEvents(dataPath, EventType.NodeCreated);
-        }
-        else if (oldStat.getVersion() != stat.getVersion())
-        {
-          // System.out.println("\t--fireNodeChanged: " + dataPath + ", oldVersion: " +
-          // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
-          fireEvents(dataPath, EventType.NodeDataChanged);
-        }
-      }
-      else
-      {
-        // we may see dataChange on child before childChange on parent
-        // in this case, let childChange update cache
-      }
-    }
-    finally
-    {
-      _lock.writeLock().unlock();
-    }
-
-  }
-
-  @Override
-  public void handleDataDeleted(String dataPath) throws Exception
-  {
-    // System.out.println("handleDataDeleted: " + dataPath);
-
-    try
-    {
-      _lock.writeLock().lock();
-      _accessor.unsubscribeDataChanges(dataPath, this);
-      _accessor.unsubscribeChildChanges(dataPath, this);
-
-      String parentPath = new File(dataPath).getParent();
-      String name = new File(dataPath).getName();
-      removeFromParentChildSet(parentPath, name);
-      _cache.remove(dataPath);
-
-      fireEvents(dataPath, EventType.NodeDeleted);
-    }
-    finally
-    {
-      _lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void handleStateChanged(KeeperState state) throws Exception
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void handleNewSession() throws Exception
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  public void subscribe(String path, HelixPropertyListener listener)
-  {
-    synchronized (_listener)
-    {
-      Set<HelixPropertyListener> listeners = _listener.get(path);
-      if (listeners == null)
-      {
-        listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
-        _listener.put(path, listeners);
-      }
-      listeners.add(listener);
-    }
-  }
-
-  public void unsubscribe(String path, HelixPropertyListener childListener)
-  {
-    synchronized (_listener)
-    {
-      final Set<HelixPropertyListener> listeners = _listener.get(path);
-      if (listeners != null)
-      {
-        listeners.remove(childListener);
-      }
-    }
-  }
-
-  private void fireEvents(final String path, EventType type)
-  {
-    String tmpPath = path;
-    final String clientPath =
-        (_chrootPath == null ? path : (_chrootPath.equals(path) ? "/"
-            : path.substring(_chrootPath.length())));
-
-    while (tmpPath != null)
-    {
-      Set<HelixPropertyListener> listeners = _listener.get(tmpPath);
-
-      if (listeners != null && !listeners.isEmpty())
-      {
-        for (final HelixPropertyListener listener : listeners)
-        {
-          try
-          {
-            switch (type)
-            {
-            case NodeDataChanged:
-              // listener.onDataChange(path);
-              _eventThread.send(new ZkCacheEvent("dataChange on " + path + " send to "
-                  + listener)
-              {
-                @Override
-                public void run() throws Exception
-                {
-                  listener.onDataChange(clientPath);
-                }
-              });
-              break;
-            case NodeCreated:
-              // listener.onDataCreate(path);
-              _eventThread.send(new ZkCacheEvent("dataCreate on " + path + " send to "
-                  + listener)
-              {
-                @Override
-                public void run() throws Exception
-                {
-                  listener.onDataCreate(clientPath);
-                }
-              });
-              break;
-            case NodeDeleted:
-              // listener.onDataDelete(path);
-              _eventThread.send(new ZkCacheEvent("dataDelete on " + path + " send to "
-                  + listener)
-              {
-                @Override
-                public void run() throws Exception
-                {
-                  listener.onDataDelete(clientPath);
-                }
-              });
-              break;
-            default:
-              break;
-            }
-          }
-          catch (Exception e)
-          {
-            LOG.error("Exception in handle events.", e);
-          }
-        }
-      }
-
-      tmpPath = new File(tmpPath).getParent();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
deleted file mode 100644
index 8912154..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.SerializableSerializer;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
-
-/**
- * ZKClient does not provide some functionalities, this will be used for quick fixes if
- * any bug found in ZKClient or if we need additional features but can't wait for the new
- * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
- *
- * @author kgopalak
- *
- */
-
-public class ZkClient extends org.I0Itec.zkclient.ZkClient
-{
-  private static Logger LOG = Logger.getLogger(ZkClient.class);
-  public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
-  public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
-  // public static String sessionId;
-  // public static String sessionPassword;
-
-  private PathBasedZkSerializer _zkSerializer;
-
-  public ZkClient(IZkConnection connection, int connectionTimeout,
-                  PathBasedZkSerializer zkSerializer)
-  {
-    super(connection, connectionTimeout, new ByteArraySerializer());
-    _zkSerializer = zkSerializer;
-
-    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-    LOG.info("create a new zkclient. " + Arrays.asList(calls));
-  }
-
-  public ZkClient(IZkConnection connection, int connectionTimeout,
-                  ZkSerializer zkSerializer)
-  {
-    this(connection, connectionTimeout, new BasicZkSerializer(zkSerializer));
-  }
-
-  public ZkClient(IZkConnection connection, int connectionTimeout)
-  {
-    this(connection, connectionTimeout, new SerializableSerializer());
-  }
-
-  public ZkClient(IZkConnection connection)
-  {
-    this(connection, Integer.MAX_VALUE, new SerializableSerializer());
-  }
-
-  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-                  ZkSerializer zkSerializer)
-  {
-    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
-  }
-
-  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-                  PathBasedZkSerializer zkSerializer)
-  {
-    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
-  }
-
-  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
-  {
-    this(new ZkConnection(zkServers, sessionTimeout),
-         connectionTimeout,
-         new SerializableSerializer());
-  }
-
-  public ZkClient(String zkServers, int connectionTimeout)
-  {
-    this(new ZkConnection(zkServers), connectionTimeout, new SerializableSerializer());
-  }
-
-  public ZkClient(String zkServers)
-  {
-    this(new ZkConnection(zkServers), Integer.MAX_VALUE, new SerializableSerializer());
-  }
-
-  {
-  }
-
-  @Override
-  public void setZkSerializer(ZkSerializer zkSerializer)
-  {
-    _zkSerializer = new BasicZkSerializer(zkSerializer);
-  }
-
-  public void setZkSerializer(PathBasedZkSerializer zkSerializer)
-  {
-    _zkSerializer = zkSerializer;
-  }
-
-  public IZkConnection getConnection()
-  {
-    return _connection;
-  }
-
-  @Override
-  public void close() throws ZkInterruptedException
-  {
-    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-    LOG.info("closing a zkclient. zookeeper: "
-        + (_connection == null ? "null" : ((ZkConnection) _connection).getZookeeper())
-        + ", callStack: " + Arrays.asList(calls));
-
-    super.close();
-  }
-
-  public Stat getStat(final String path)
-  {
-    long startT = System.nanoTime();
-
-    try
-    {
-      Stat stat = retryUntilConnected(new Callable<Stat>()
-      {
-
-        @Override
-        public Stat call() throws Exception
-        {
-          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
-          return stat;
-        }
-      });
-
-      return stat;
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  // override exists(path, watch), so we can record all exists requests
-  @Override
-  protected boolean exists(final String path, final boolean watch)
-  {
-    long startT = System.nanoTime();
-
-    try
-    {
-      return retryUntilConnected(new Callable<Boolean>()
-      {
-        @Override
-        public Boolean call() throws Exception
-        {
-          return _connection.exists(path, watch);
-        }
-      });
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  // override getChildren(path, watch), so we can record all getChildren requests
-  @Override
-  protected List<String> getChildren(final String path, final boolean watch)
-  {
-    long startT = System.nanoTime();
-
-    try
-    {
-      return retryUntilConnected(new Callable<List<String>>()
-      {
-        @Override
-        public List<String> call() throws Exception
-        {
-          return _connection.getChildren(path, watch);
-        }
-      });
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("getChildren, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends Object> T deserialize(byte[] data, String path)
-  {
-    if (data == null)
-    {
-      return null;
-    }
-    return (T) _zkSerializer.deserialize(data, path);
-  }
-
-  // override readData(path, stat, watch), so we can record all read requests
-  @Override
-  @SuppressWarnings("unchecked")
-  protected <T extends Object> T readData(final String path,
-                                          final Stat stat,
-                                          final boolean watch)
-  {
-    long startT = System.nanoTime();
-    try
-    {
-      byte[] data = retryUntilConnected(new Callable<byte[]>()
-      {
-
-        @Override
-        public byte[] call() throws Exception
-        {
-          return _connection.readData(path, stat, watch);
-        }
-      });
-      return (T) deserialize(data, path);
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("getData, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends Object> T readDataAndStat(String path,
-                                              Stat stat,
-                                              boolean returnNullIfPathNotExists)
-  {
-    T data = null;
-    try
-    {
-      data = (T) super.readData(path, stat);
-    }
-    catch (ZkNoNodeException e)
-    {
-      if (!returnNullIfPathNotExists)
-      {
-        throw e;
-      }
-    }
-    return data;
-  }
-
-  public String getServers()
-  {
-    return _connection.getServers();
-  }
-
-  public byte[] serialize(Object data, String path)
-  {
-    return _zkSerializer.serialize(data, path);
-  }
-
-  @Override
-  public void writeData(final String path, Object datat, final int expectedVersion)
-  {
-    long startT = System.nanoTime();
-    try
-    {
-      final byte[] data = serialize(datat, path);
-
-      retryUntilConnected(new Callable<Object>()
-      {
-
-        @Override
-        public Object call() throws Exception
-        {
-          _connection.writeData(path, data, expectedVersion);
-          return null;
-        }
-      });
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("setData, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) throws InterruptedException
-  {
-    Stat stat = null;
-    long start = System.nanoTime();
-    try
-    {
-      byte[] bytes = _zkSerializer.serialize(datat, path);
-      stat =
-          ((ZkConnection) _connection).getZookeeper().setData(path,
-                                                              bytes,
-                                                              expectedVersion);
-      return stat;
-    }
-    catch (KeeperException e)
-    {
-      throw ZkException.create(e);
-    }
-    finally
-    {
-      long end = System.nanoTime();
-      LOG.info("setData, path: " + path + ", time: " + (end - start) + " ns");
-    }
-  }
-  
-  @Override
-  public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException,
-      IllegalArgumentException,
-      ZkException,
-      RuntimeException
-  {
-    if (path == null)
-    {
-      throw new NullPointerException("path must not be null.");
-    }
-
-    long startT = System.nanoTime();
-    try
-    {
-      final byte[] bytes = data == null ? null : serialize(data, path);
-
-      return retryUntilConnected(new Callable<String>()
-      {
-
-        @Override
-        public String call() throws Exception
-        {
-          return _connection.create(path, bytes, mode);
-        }
-      });
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("create, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  @Override
-  public boolean delete(final String path)
-  {
-    long startT = System.nanoTime();
-    try
-    {
-      try
-      {
-        retryUntilConnected(new Callable<Object>()
-        {
-
-          @Override
-          public Object call() throws Exception
-          {
-            _connection.delete(path);
-            return null;
-          }
-        });
-
-        return true;
-      }
-      catch (ZkNoNodeException e)
-      {
-        return false;
-      }
-    }
-    finally
-    {
-      long endT = System.nanoTime();
-      LOG.info("delete, path: " + path + ", time: " + (endT - startT) + " ns");
-    }
-  }
-
-  public void asyncCreate(final String path,
-                          Object datat,
-                          CreateMode mode,
-                          CreateCallbackHandler cb)
-  {
-    byte[] data = null;
-    if (datat != null)
-    {
-      data = serialize(datat, path);
-    }
-    ((ZkConnection) _connection).getZookeeper().create(path, data, Ids.OPEN_ACL_UNSAFE, // Arrays.asList(DEFAULT_ACL),
-                                                       mode,
-                                                       cb,
-                                                       null);
-  }
-
-  public void asyncSetData(final String path,
-                           Object datat,
-                           int version,
-                           SetDataCallbackHandler cb)
-  {
-    final byte[] data = serialize(datat, path);
-    ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, null);
-
-  }
-
-  public void asyncGetData(final String path, GetDataCallbackHandler cb)
-  {
-    ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, null);
-  }
-
-  public void asyncExists(final String path, ExistsCallbackHandler cb)
-  {
-    ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, null);
-
-  }
-
-  public void asyncDelete(String path, DeleteCallbackHandler cb)
-  {
-    ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
deleted file mode 100644
index 2c4f269..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class ZkStateChangeListener implements IZkStateListener
-{
-  private volatile boolean _isConnected;
-  private volatile boolean _hasSessionExpired;
-  private final ZKHelixManager _zkHelixManager;
-
-  private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
-
-  public ZkStateChangeListener(ZKHelixManager zkHelixManager)
-  {
-    this._zkHelixManager = zkHelixManager;
-
-  }
-
-  @Override
-  public void handleNewSession()
-  {
-    // TODO:bug in zkclient .
-    // zkclient does not invoke handleStateChanged when a session expires but
-    // directly invokes handleNewSession
-    _isConnected = true;
-    _hasSessionExpired = false;
-    _zkHelixManager.handleNewSession();
-  }
-
-  @Override
-  public void handleStateChanged(KeeperState keeperState) throws Exception
-  {
-    switch (keeperState)
-    {
-    case SyncConnected:
-      ZkConnection zkConnection =
-          ((ZkConnection) _zkHelixManager._zkClient.getConnection());
-      logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
-      _isConnected = true;
-      break;
-    case Disconnected:
-      logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
-          + _zkHelixManager._sessionId + ", instance: "
-          + _zkHelixManager.getInstanceName() + ", type: "
-          + _zkHelixManager.getInstanceType());
-
-      _isConnected = false;
-      break;
-    case Expired:
-      logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
-          + _zkHelixManager._sessionId + ", instance: "
-          + _zkHelixManager.getInstanceName() + ", type: "
-          + _zkHelixManager.getInstanceType());
-
-      _isConnected = false;
-      _hasSessionExpired = true;
-      break;
-    }
-  }
-
-  boolean isConnected()
-  {
-    return _isConnected;
-  }
-
-  void disconnect()
-  {
-    _isConnected = false;
-  }
-
-  boolean hasSessionExpired()
-  {
-    return _hasSessionExpired;
-  }
-}